Kafka streams : Empaquetando nuestra aplicación


En el post anterior Kafka streams: Primera aplicación con Kafka streams aprendimos a programar nuestra primer aplicación Kafka streams y ejecutarla desde nuestra computadora, pero cuando creemos una aplicación que funcionará en producción buscamos que se ejecute en un servidor, así que en este post veremos como empaquetarla para que se ejecute en un cluster de Kafka.

Paso 1 Crear un fat Jar

Las aplicaciones Java se empaquetan en archivos llamados Jar(Java Archive), el problema es que el proceso default para construir estos archivos es incluir solo el código de nuestra aplicación y no todas sus dependencias (kafka-streams, slf4j, etc.).

Un fat Jar es un archivo Jar que incluye el código de nuestra aplicación pero también todas las dependencias que necesita, esto nos permite ejecutarlo en cualquier lugar ya que contiene todo lo que necesita.

Veamos el siguiente pom.xml, podemos ver que se incluyó un plugin de maven que nos permitirá generar nuestro fat jar en el que especificamos cuál será nuestra clase principal.

Hecho esto simplemente ejecutaremos el siguiente comando desde nuestra terminal:

mvn clean package

Probando nuestro fat Jar

Si queremos probar nuestro fat jar simplemente debemos ejecutar el siguiente comando desde la terminal:

 java -jar target/kafka-streams-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Salida:

INFO StreamsConfig values:
	application.id = stream-app
	application.server =
	bootstrap.servers = [127.0.0.1:9092]
	buffered.records.per.partition = 1000
	cache.max.bytes.buffering = 10485760
	client.id =
	commit.interval.ms = 30000
	connections.max.idle.ms = 540000
	default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
	default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
	default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
	key.serde = null
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	num.standby.replicas = 0
	num.stream.threads = 1
	partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
	poll.ms = 100
	processing.guarantee = at_least_once
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	replication.factor = 1
	request.timeout.ms = 40000
	retry.backoff.ms = 100
	rocksdb.config.setter = null
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	state.cleanup.delay.ms = 600000
	state.dir = /tmp/kafka-streams
	timestamp.extractor = null
	value.serde = null
	windowstore.changelog.additional.retention.ms = 86400000
	zookeeper.connect =
 (org.apache.kafka.streams.StreamsConfig:223)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:473)
INFO ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [127.0.0.1:9092]
	check.crcs = true
	client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-consumer
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = stream-app
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 2147483647
	max.poll.records = 1000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:483)
INFO ConsumerConfig values:
	auto.commit.interval.ms = 5000
	auto.offset.reset = earliest
	bootstrap.servers = [127.0.0.1:9092]
	check.crcs = true
	client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-restore-consumer
	connections.max.idle.ms = 540000
	enable.auto.commit = false
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id =
	heartbeat.interval.ms = 3000
	interceptor.classes = null
	internal.leave.group.on.close = false
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 2147483647
	max.poll.records = 1000
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 305000
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from CREATED to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from CREATED to RUNNING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] Started Kafka Stream process (org.apache.kafka.streams.KafkaStreams:453)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread:523)
INFO Discovered coordinator 192.168.14.141:9092 (id: 2147483647 rack: null) for group stream-app. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
INFO Revoking previously assigned partitions [] for group stream-app (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance.
	current assigned active tasks: []
	current assigned standby tasks: []
 (org.apache.kafka.streams.processor.internals.StreamThread:205)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from RUNNING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Updating suspended tasks to contain active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1400)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Removing all active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1407)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] partition revocation took 1 ms.
	suspended active tasks: []
	suspended standby tasks: []
	previous active tasks: []
 (org.apache.kafka.streams.processor.internals.StreamThread:227)
INFO (Re-)joining group stream-app (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Constructed client metadata {d85064bc-a649-43bd-b74a-ba6fc5f51649=ClientMetadata{hostInfo=null, consumers=[stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-consumer-c55d36b5-a9ba-4832-aa11-c3469137c7ab], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:316)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Assigned tasks to clients as {d85064bc-a649-43bd-b74a-ba6fc5f51649=[activeTasks: ([0_0, 0_1, 1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1, 1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:493)
INFO Successfully joined group stream-app with generation 8 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
INFO Setting newly assigned partitions [input-topic-0, input-topic-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] for group stream-app (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] at state PARTITIONS_REVOKED: new partitions [input-topic-0, input-topic-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] assigned at the end of consumer rebalance.
	assigned active tasks: [0_0, 0_1, 1_0, 1_1]
	assigned standby tasks: []
	current suspended active tasks: []
	current suspended standby tasks: []
	previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:160)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from REBALANCING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Adding assigned tasks as active {0_0=[input-topic-0], 0_1=[input-topic-1], 1_0=[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0], 1_1=[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1]} (org.apache.kafka.streams.processor.internals.StreamThread:1280)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 0_0 with assigned partitions [[input-topic-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:1263)
INFO ProducerConfig values:
	acks = 1
	batch.size = 16384
	bootstrap.servers = [127.0.0.1:9092]
	buffer.memory = 33554432
	client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-producer
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 100
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 10
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO task [0_0] Created state store manager for task 0_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [0_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 0_0 with assigned partitions [input-topic-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 0_1 with assigned partitions [[input-topic-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [0_1] Created state store manager for task 0_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [0_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 0_1 with assigned partitions [input-topic-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 1_0 with assigned partitions [[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [1_0] Created state store manager for task 1_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [1_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 1_0 with assigned partitions [stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 1_1 with assigned partitions [[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [1_1] Created state store manager for task 1_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [1_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 1_1 with assigned partitions [stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Starting restoring state stores from changelog topics [] (org.apache.kafka.streams.processor.internals.StoreChangelogReader:121)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Adding assigned standby tasks {} (org.apache.kafka.streams.processor.internals.StreamThread:1345)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from ASSIGNING_PARTITIONS to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from REBALANCING to RUNNING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] partition assignment took 147 ms.
	current active tasks: [0_0, 0_1, 1_0, 1_1]
	current standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:193)

Con esto podemos ver que nuestra aplicación se construyó de forma correcta y que está lista para desplegarla en nuestro cluster.

Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.

En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Anuncios

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s