Kafka streams: Primera aplicación con Kafka streams


El primer ejemplo que escribiremos será una topología que leerá mensajes de un topic de Kafka y contará el número de palabras en el mismo emitiendo el resultado a un topic de salida, para hacerlo se realizarán las siguientes operaciones:

  • Lee un mensaje de kafka
  • Transforma el mensaje en minúsculas
  • Separa el mensaje por espacios en un arreglo de Strings
  • Selecciona la palabra como una llave
  • Agrupa los mensajes por el nombre de la llave
  • Emite el resultado a un topic de salida

Para hacer lo anterior seguiremos los siguientes pasos:

1 Iniciar el cluster de Zookeeper y Kafka

El primer paso será iniciar nuestro cluster de Kafka con los siguientes comandos:

Zookeeper

sh zookeeper-server-start.sh ../config/zookeeper.properties

Kafka

sh kafka-server-start.sh ../config/server.properties

Con lo anterior tendremos un cluster de Kafka y Zookeeper corriendo en nuestra máquina.

2 Crear los topics de Kafka a utilizar

En este ejemplo utilizaremos 2 topics, un topic de entrada y uno de salida, así que el siguiente paso es crearlos, para hacerlo ejecutaremos los siguientes comandos:

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic input-topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic output-topic

Con lo anterior contaremos con dos topics input-topic y output-topic cada uno con 1 replica y 2 particiones. Si deseas confirmar que los topics se crearon correctamente, puedes ejecutar el siguiente comando para listar los topics:

sh kafka-topics.sh --list --zookeeper localhost:2181

Esto te mostrará los topics que existen en tu cluster.

Paso 3 Configuración del proyecto

Una vez que ya iniciamos nuestro cluster y creamos nuestros topics, el siguiente paso será crear nuestro proyecto de Java que leerá de un topic, hará modificaciones a la información y colocará el resultado en un topic de salida.

Para esto agregaremos las siguientes dependencias a nuestro proyecto:

pom.xml

Paso 4 Programando nuestra topología

Una vez que ya tenemos configurado nuestro proyecto, el siguiente paso será programar nuestra topología, veamos el siguiente código:


import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class StreammingApplication {

	public static Properties getKafkaConfiguration() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-app");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		return props;
	}

	public static void main(String[] args) {
		KStreamBuilder builder = new KStreamBuilder();

		KStream wordCountsInput = builder.stream("input-topic");
		KTable counts = wordCountsInput.mapValues(String::toLowerCase)
				.flatMapValues(value -> Arrays.asList(value.split(" "))).selectKey((ignoredKey, word) -> word)
				.groupByKey().count();

		counts.to(Serdes.String(), Serdes.Long(), "output-topic");

		KafkaStreams streams = new KafkaStreams(builder, getKafkaConfiguration());
		streams.start();
		Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
	}
}

En el código anterior podemos ver lo siguiente:

  • Hay un método que devuelve la configuración de Kafka a utilizar
  • Creamos un objeto de tipo KStreamBuilder que nos permitirá definir nuestro topología de Kafka streams.
  • KStreamBuilder.stream() devuelve un flujo de entrada, el cual es básicamente un consumer, a una referencia de tipo KStream<String, String> contiene 2 genéricos porque uno será para la llave del mensaje y el otro para el valor.
  • KStream nos permitirá realizar las siguientes operaciones:
    • mapValues(..) : Lo utilizaremos para transformar nuestro mensaje a minúsculas.
    • flatMapValues(..): Lo utilizaremos para separar nuestro mensaje en multiples mensajes, uno por cada palabra.

    • selectKey(..) : Nos permitirá seleccionar una llave, lo utilizaremos para utilizar la palabra como llave de nuestro mensaje.

    • groupByKey() : Nos permitirá agrupar por una llave, en este caso será la palabra.

    • count(): Nos permitirá el número de mensajes.
  • Lo anterior nos devolverá un objeto de tipo KTable<String,Long> como respuesta en el primer genérico tendremos las palabras y en el segundo el conteo.
  • Utilizaremos counts.to(Serdes.String(), Serdes.Long(), “output-topic”); para emitir la respuesta a otro topic llamado “output-topic“, utilizamos Serdes.String(), Serdes.Long() para definir el topi de dato de la llave y el valor del mensaje de salida.

Paso 5 probando nuestra aplicación

Una vez que ya entendimos el código el siguiente paso será probarlo, para esto abriremos 2 consumers y un producer del siguiente modo:

  • Consumer 1 : Leerá los mensajes que se envíen al topic “input-topic“.
  • Consumer 2: Leerá los mensajes que se envíen al topic “output-topic
  • Producer 1: Enviará mensajes al topic “input-topic

Para hacerlo utilizaremos los siguientes comandos en diferentes terminales:

Consumer1

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic input-topic

Consumer2

 sh kafka-console-consumer.sh --zookeeper localhost:2181  --topic output-topic --formatter kafka.tools.DefaultMessageFormatter --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

Producer1

sh kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic

Como podemos ver el consumer 2 es un poco más complejo que el uno ya que especificaremos serializers y deserializers diferentes a los que están por defecto, más adelante hablaremos sobre esto.

Una vez que ya contamos con nuestros producers y consumers listos tendremos que ejecutar nuestra aplicación como cualquier otra aplicación java, una vez hecho esto el último paso será escribir mensajes y presionar enter en la consola donde tenemos el producer, en nuestro caso escribiremos el mensaje:

devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré

Una vez hecho esto, veremos en el Consumer1 lo siguiente:

devs4j es el mejor sitio web de programación lo buscaré en Facebook y Twitter y los seguiré

Este es el mensaje tal cuál que enviamos, lo interesante viene cuando vemos el Consumer2:

devs4j	1
es	3
sitio	1
web	1
y	2
seguiré	1
el	1
mejor	1
de	1
programación	1
lo	1
buscaré	1
en	1
facebook	1
twitter	1
los	1

Como vemos la frase se separó en diferentes mensajes cada uno con su propio número de ocurrencias.

Como vemos Kafka streams es muy fácil de utilizar y es posible realizar transformación a grandes flujos de información en tiempo real.

Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.

En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Apache Kafka: Producers


El punto más importante al utilizar Apache Kafka es el envío y consumo de mensajes, en este post explicaremos como funcionan los producers y consumers, si tienes dudas sobre como instalar tu cluster de kafka te recomendamos el post Primeros pasos con Apache Kafka en Español !.

Producers

Un producer es un componente que escribe mensajes en un topic, para poder hacerlo requiere de la siguiente información:

  • Uno o muchos brokers de kafka
  • El nombre de un topic al cuál va a escribir
  • El mensaje que va a depositar en el topic

Acknowledges

La forma en la que podemos asegurar que un mensaje se entrego de forma exitosa es a través del uso de acknowledges, en los producers de kafka contamos con los siguientes:

  • acks=0 : Define que no es necesario recibir un ack por cada mensaje, esto mejora el performance de forma importante pero es posible perder mensajes.
  • acks=1 : Define que es necesario un ack del broker líder que recibe el mensaje, esto asegura que el líder recibió el mensaje pero no asegura que todas las replicas lo tienen.
  • acks=all : Define que es necesario un ack del broker líder y todas las replicas, esto asegura que tanto el líder como todas las replicas recibieron el mensaje.

Podemos concluir con lo anterior que entre menor sea el número de acks tendremos un mejor performance pero más probabilidades de perder datos y entre más acks tendremos un menor performance pero sin perdida de datos. Debemos considerar esto a la hora de diseñar nuestra aplicación.

Messages keys

Cuando utilizamos un producer podemos enviar cualquier información que necesitemos, pero recordemos que al contar con muchas particiones nosotros no podemos definir a que partición se mandará el mensaje, pero esto trae la pregunta, ¿Cómo puedo asegurar el orden de mis mensajes? La respuesta es Haciendo uso de message keys.

Adicional al mensaje es posible indicar un message key, con esto, todos los mensajes que cuenten con la misma message key se entregarán en la misma partición, si recordamos kafka asegura el orden de los mensajes que se encuentren dentro de la misma partición.

Programando nuestro producer

Una vez que entendemos la teoría el siguiente paso es programarlo, para esto programaremos nuestro kafka producer, para esto sigamos los siguientes pasos:

Paso 1 Configurando el proyecto

El primer paso será configurar nuestro proyecto, para esto agregaremos las siguientes entradas a nuestro archivo pom.xml:


	
		org.apache.kafka
		kafka-clients
		1.0.1
	


	
		
			maven-compiler-plugin
			3.2
			
				1.8
				1.8
			
		
	

Con lo anterior definiremos la dependencia a utilizar para crear un producer y el plugin para definir la versión 1.8 de java.

Paso 2 Definiendo de errores

El siguiente paso será crear algunas excepciones a utilizar en nuestro producer:


/**
 * @author raidentrance
 *
 */
public enum KafkaErrors {
	PRODUCER_NOT_CONNECTED("Not alive connection available");

	private String message;

	private KafkaErrors(String message) {
		this.message = message;
	}

	public String getMessage() {
		return message;
	}
}

La enumeración anterior será utilizada para definir los mensajes de error que se generen en nuestro producer.

/**
 * @author raidentrance
 *
 */
public class KafkaProducerException extends Exception {

	private static final long serialVersionUID = -2134618105767008561L;

	public KafkaProducerException(String message) {
		super(message);
	}
}

La clase anterior define la excepción que se propagará en caso de que una excepción con Kafka suceda.

Paso 3 Creando el kafka producer

Una ve que contamos con las clases necesarias crearemos un pequeño controller para enviar los mensajes a kafka.


import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.raidentrance.kafka.error.KafkaErrors;
import com.raidentrance.kafka.error.KafkaProducerException;

/**
 * @author raidentrance
 *
 */
public class KafkaMessageProducer {
	private Properties properties;
	private Producer producer;

	public KafkaMessageProducer(Properties properties) {
		this.properties = properties;
	}

	public void connect() {
		producer = new KafkaProducer(properties);
	}

	public void sendMessage(String topic, String key, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, key, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void sendMessage(String topic, String value) throws KafkaProducerException {
		if (producer != null) {
			producer.send(new ProducerRecord(topic, value));
		} else {
			throw new KafkaProducerException(KafkaErrors.PRODUCER_NOT_CONNECTED.getMessage());
		}
	}

	public void disconnect() {
		producer.close();
	}
}

El controller anterior define los siguientes métodos:

  • El método connect será utilizado para crear nuestro producer
  • El método

    void sendMessage(String topic, String key, String value) :  Se utilizará para enviar mensajes al topic especificado indicando una llave.

  • El método

    void sendMessage(String topic, String value) : Se utilizará para enviar mensajes al topic especificado sin indicar una llave.

Paso 4 Probando todo junto

Una vez que tenemos todo listo el siguiente paso será enviar mensajes, para esto crearemos el siguiente método main:

public static void main(String[] args) throws KafkaProducerException {
	Properties props = new Properties();
	props.setProperty("bootstrap.servers", "kafka_host:9092");
	props.setProperty("key.serializer", StringSerializer.class.getName());
	props.setProperty("value.serializer", StringSerializer.class.getName());
	props.setProperty("acks", "1");
	props.setProperty("retries", "3");
	KafkaMessageProducer producer = new KafkaMessageProducer(props);
	producer.connect();
	producer.sendMessage("geeks-mexico", "geek-key", "raidentrance");
	producer.disconnect();
}

El código anterior muestra las propiedades necesarias para enviar y recibir mensajes a kafka y cómo mandar mensajes al topic geeks-mexico utilizando la llave geek-key el mensaje raidentrance.

Si te gusta el contenido y quieres enterarte cuando realicemos un post nuevo síguenos en nuestras redes sociales https://twitter.com/geeks_mx y https://www.facebook.com/geeksJavaMexico/.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

 

Apache Kafka : Brokers y réplica de datos


En el post anterior hablamos sobre el ecosistema de Apache Kafka en el post Apache Kafka: El ecosistema y los conceptos básicos, ahora es el turno de hablar de brokers y replicación de datos.

Brokers

Un cluster de Apache Kafka se compone de multiples brokers (servidores), cada uno de ellos contiene un id y  particiones. Una vez que te conectas a un broker, estarás conectado al cluster completo.

Un cluster de Kafka esta constituido de la siguiente forma:

brokers

Como se puede ver en la imagen tendremos lo siguiente:

  • Un cluster de Kafka puede tener multiples brokers
  • Un topic puede tener multiples particiones
  • Cada partición puede vivir en diferentes brokers
  • Un topic puede vivir el múltiples brokers

Replication factor

La traducción literal de replication factor es factor de replicas, esto se refiere a cuantas copias tendré de mi topic, a continuación se muestran algunos puntos a considerar para asignarlo:

  • Se debe asignar un replication factor mayor que cero
  • El número máximo de replicas es el número máximo de brokers en el servidor
  • Si un nodo está abajo, otro nodo que cuente con una replica puede entregar los datos
  • Las particiones también se replican

Veamos la siguiente imagen:

replication

Analicemos la imagen anterior:

  • Se tienen 3 brokers
  • Se tiene 1 topic llamado Topic 1
  • Topic 1 tiene 2 particiones
  • Topic 1 tiene un replication factor 2

Con lo anterior imaginemos los siguientes escenarios:

  1. Broker 1 se cae

broker1

Si esto sucede aun tendremos funcionando el Topic 1 debido a que la partición 0 se encuentra en broker 2 y la partición 1 se encuentra en el broker 3.

2. Broker 2 se cae

broker2

Si esto sucede el Topic 1 seguiría funcionando de forma correcta debido a que Broker 1 contiene la partición 0 y broker 3 contiene la partición 1.

3. Broker 3 se cae

broker3

Por último si el broker 3 se cae el Topic 1 seguiría funcionando de forma correcta debido a que la partición 0 se encuentra en el broker 1 y la partición 1 se encuentra en el broker 2.

Es importante mencionar que existen sistemas que requieren alta disponibilidad que cuentan hasta con 100 brokers funcionando de forma simultanea, esto nos puede dar una idea del nivel de confianza que les podemos dar.

Partition Leader

Como pueden ver un topic puede tener multiples particiones, esto trae el concepto de partition leader o partición líder. Solo un broker puede ser el líder para una partición en específico y solo ese líder puede enviar y recibir datos, los demás brokers solo se utilizarán para sincronizar los datos.

Si recordamos lo que mencionamos anteriormente si un topic tiene 3 particiones cada partición vivirá en un broker diferente, si a esto agregamos que un topic puede tener un replication factor digamos de 3 entonces tendremos el número de particiones multiplicadas por 3, esto significa que una de esas 3 particiones un broker será el líder y los demás se conocerán como ISR (In sync replica).

Si te gusta el contenido y quieres enterarte cuando realicemos un post nuevo síguenos en nuestras redes sociales https://twitter.com/geeks_mx y https://www.facebook.com/geeksJavaMexico/.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Apache Kafka: El ecosistema y los conceptos básicos


En este post explicaremos el ecosistema de Apache Kafka, si aún no tienes configurado tu cluster te recomendamos leer el post Primeros pasos con Apache Kafka en Español !.

Antes de empezar veamos la siguiente imagen que representa el ecosistema básico de Kafka.ecosistema basico

En el diagrama anterior tenemos los siguientes componentes:

  • Una fuente de datos: esta puede ser un api, base de datos, motor de búsqueda, etc.
  • Producer API: Es un api que permite colocar mensajes en lugares llamados topics dentro de un cluster de Kafka.
  • Kafka / Zookeeper : Más adelante explicaremos más a detalle como funcionan estos dos componentes en el ecosistema pero por ahora consideremos que serán quienes almacenarán los mensajes en lugares llamados topics.
  • Consumer API: Es un api que permite leer mensajes desde los topics de kafka.
  • Target system: Sistema destino que recibirá la información que se está procesando.

Una vez que entendemos como funciona el ecosistema básico de Kafka ahora veamos el Kafka Extended API:

kafka extended.png

Veamos punto por punto:

  • Fuente de datos: Del mismo modo que en el ecosistema básico tendremos una fuente de datos a procesar.
  • Kafka connect source: Sabemos que existe un producer API, pero, al darse cuenta que existen muchas fuentes de datos similares, se decidió crear este proyecto, el cual define un conjunto de conectores a fuentes de datos comunes para que solo lo configures en tu cluster y este lea la información y la mande a algún topic de kafka.
  • Kafka connect sink: Del mismo modo que en el punto anterior existen sistemas de destino comunes, para esto se creó este proyecto que de igual modo contiene conectores los cuales envían información a esos sistemas destino.
  • Kafka streams: La mayoría de las veces no solo necesitas leer información y escribirla en un sistema destino, sino que deseas realizar algunas transformaciones a los datos, kafka steams permite realizar esas operaciones.
  • Target systems: Del mismo modo que en el ecosistema básico tendremos sistemas destino que recibirán la información que procesamos.

Conceptos básicos

A continuación presentaremos los conceptos básicos que debes entender antes de continuar utilizando Apache Kafka:

  • Topic : Un topic es un flujo de datos del mismo contexto al cuál se le asigna un nombre, en un cluster de Kafka es posible crear multiples topics del mismo modo que en una base de datos es posible crear muchas tablas.
  • Partition : Un topic es dividido en particiones, las cuales son ordenadas, de este modo podemos ver una partición como parte de un topic.
  • Message: Un mensaje es información que colocamos en un topic.
  • Offset: Cada mensaje colocado dentro de una partición recibe un id incremental al que se le conoce como offset.

Si tratamos de visualizar lo anterior se verá del siguiente modo:

Captura de pantalla 2018-03-21 a las 12.24.07 p.m.

Puntos importantes:

  • Un offset solo tiene significado dentro de una partición, si vemos la imagen anterior podemos ver que podemos tener el offset 0 en las 3 particiones, pero esto no significa que el valor dentro de las 3 particiones es el mismo.
  • El orden está garantizado pero solo dentro de la misma partición no en las otras particiones.
  • A diferencia de otros brokers los mensajes que se entregan en Kafka se almacenarán por un periodo de tiempo, por default este periodo es de 2 semanas.
  • Una vez que la información se almacena en una partición esta no se puede cambiar.
  • Puedes tener las particiones que quieras en un topic de Kafka.
  • Tu no colocas los mensajes en una partición en específico, tu lo haces en topics, la asignación de particiones sobre los mensajes se hace de manera aleatoria a menos que tengas una llave.
  • Entre más particiones tengas puedes tener más paralelismo, esto significa que puedes tener más consumers leyendo al mismo tiempo.

En el siguiente post explicaremos como funcionan los brokers, la replicación de los datos, el envío y recepción de mensajes con Java, etc.

Si te gusta el contenido y quieres enterarte cuando realicemos un post nuevo síguenos en nuestras redes sociales https://twitter.com/geeks_mx y https://www.facebook.com/geeksJavaMexico/.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Primeros pasos con Apache Kafka en Español !


Apache Kafka es una plataforma distribuida de Streaming utilizada para construir plataformas de procesamiento en tiempo real y permite las siguientes operaciones:

  • Publicar y suscribirse a flujos de información
  • Guardar flujos de información en un modo tolerante a fallas
  • Procesar los flujos de información en tiempo real

Paso 1: Descargando Apache Kafka

Para utilizar apache Kafka debemos descargar el archivo binario del sitio https://kafka.apache.org/downloads, seleccionar la versión deseada (para este ejemplo utilizaremos la versión 0.11.0.1) y en la sección Binary downloads descargar la versión de acuerdo a la versión de Scala con la que contamos en nuestro equipo(Nota, esto solo importa si utilizas Scala, si no lo haces no importa la versión que selecciones).

Una vez que se descargó el archivo correctamente debemos descomprimir el archivo y acceder al folder utilizando los siguientes comandos:

tar -xzf kafka_2.12-0.11.0.1.tgz
cd kafka_2.12-0.11.0.1

Paso 2: Iniciando el servidor

Una vez que nos encontramos en el folder el siguiente paso es iniciar los servidores, digo servidores porque para iniciar Kafka es necesario utilizar Zookeeper. Es posible descargar un zookeeper diferente, pero al descargar el binario de kafka este incluye un zookeeper. Nota este zookeeper será una instancia con un solo nodo, así es que no es conveniente utilizarlo de este modo en ambientes productivos.

Para iniciar zookeeper es necesario ejecutar el siguiente comando:

bin/zookeeper-server-start.sh config/zookeeper.properties

El siguiente paso es iniciar el servidor de Kafka

bin/kafka-server-start.sh config/server.properties

Una vez ejecutados los pasos anteriores tendremos un servidor de Kafka funcionando y listo para utilizarse.

Paso 3: Creando topics a utilizar

Una vez que tenemos el servidor de Kafka funcionando el siguiente paso antes de enviar y recibir mensajes es crear un topic, Un topic es una categoría de mensajes y será utilizado para almacenar los mensajes de una aplicación que pertenecen al mismo contexto. Para crear un topic ejecutaremos el siguiente comando:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Analizando el comando anterior:

  • bin/kafka-topics.sh : Es el script sh que se utilizará para crear un topic en kafka
  • –create : Indica la operación a realizar, en este caso queremos crear un topic de kafka
  • –zookeeper : Como se puede observar no se utilizará la url de kafka para crear el topic, en su lugar utilizaremos la de zookeeper.
  • –replication-factor 1 : Con un replication factor N indicaremos que nuestro server soportará N-1 caídas antes de perder algún registro. En este caso solo contamos con un nodo en nuestro cluster así que si este cluster se cae perderemos la información almacenada.
  • –partitions 1 : Por cada topic, Kafka mantiene un log particionado, cada partición es una ordenada e inmutable secuencia de registros. De este modo los mensajes tienen asignado un número secuencial llamado offset que identifica a cada mensaje dentro de una partición.

Paso 4: Enviar y recibir mensajes

Una vez que contamos con el servidor y el topic que utilizaremos, el siguiente paso será empezar a enviar y recibir mensajes, para esto necesitaremos de dos componentes, un consumer y un producer, a continuación se presenta como ejecutar cada uno de ellos:

Consumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

Con el comando anterior seremos capaces de crear un consumer para el topic que creamos, ahora analicemos el comando:

  • bin/kafka-console-consumer.sh : Script sh que permite leer mensajes de Kafka
  • –bootstrap-server : Servidor de Kafka de donde se leerán los mensajes, puede recibir más de uno
  • –topic : Topic del cual deseamos enviar y recibir mensajes

Producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Con el comando anterior seremos capaces de crear un producer para enviar mensajes a los topic de Kafka, ahora analicemos el comando:

  • bin/kafka-console-producer.sh : Script sh que permite enviar mensajes a Kafka
  • –broker-list : Lista de servidores de Kafka que recibirán los mensajes, puede recibir más de uno
  • –topic : Topic al cual se depositarán los mensajes leidos

Paso 4: Viendo todo junto

A continuación se presenta como se ve todo junto funcionando:

Captura de pantalla 2017-10-04 a las 4.00.54 p.m..png

Casos de uso para Kafka

A continuación se presentan algunos casos de uso para Apache Kafka:

  • Messaging: Kafka funciona muy bien al reemplazar sistemas de mensajes. Estos sistemas de mensajes son utilizados para desacoplar procesos de los productores de los datos, generar un buffer de mensajes que aún no se procesarán (Cuando la demanda de trabajo es más que la capacidad de procesamiento), etc. En comparación con los sistemas de mensajes actuales, Kafka cuenta con un mejor throughput(rendimiento), replicación, tolerancia a fallas y está construido en base a particiones lo cual lo hace una muy buena solución para el procesamiento de trabajo a gran escala.
  •  Web site activity tracking (Monitoreo de la actividad de los usuarios en un sitio web): Este es el caso de uso por el que Kafka fue creado originalmente, de este modo los sitios web publican en topics en tiempo real la actividad en el sitio, las interacciones de los usuarios, las búsquedas, etc.
  • Stream processing(Procesamiento de flujos):El procesamiento de datos en diferentes pasos es un uso común de Kafka, leer datos de un sitio, colocarlos en un topic, leerlo en tiempo real y realizar procesamiento, transformaciones, agregaciones, etc. Una herramienta muy común para integrarla con esto es apache Storm, para saber más sobre esto te recomendamos nuestro post Procesamiento en tiempo real de Tweets utilizando Apache Storm en español.

Estos son solo algunos casos de estudio, existe un artículo que entra en más detalles sobre esto en el siguiente enlace The Log: What every software engineer should know about real-time data’s unifying abstraction.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com