En el post anterior Kafka streams: Primera aplicación con Kafka streams aprendimos a programar nuestra primer aplicación Kafka streams y ejecutarla desde nuestra computadora, pero cuando creemos una aplicación que funcionará en producción buscamos que se ejecute en un servidor, así que en este post veremos como empaquetarla para que se ejecute en un cluster de Kafka.
Paso 1 Crear un fat Jar
Las aplicaciones Java se empaquetan en archivos llamados Jar(Java Archive), el problema es que el proceso default para construir estos archivos es incluir solo el código de nuestra aplicación y no todas sus dependencias (kafka-streams, slf4j, etc.).
Un fat Jar es un archivo Jar que incluye el código de nuestra aplicación pero también todas las dependencias que necesita, esto nos permite ejecutarlo en cualquier lugar ya que contiene todo lo que necesita.
Veamos el siguiente pom.xml, podemos ver que se incluyó un plugin de maven que nos permitirá generar nuestro fat jar en el que especificamos cuál será nuestra clase principal.
Hecho esto simplemente ejecutaremos el siguiente comando desde nuestra terminal:
mvn clean package
Probando nuestro fat Jar
Si queremos probar nuestro fat jar simplemente debemos ejecutar el siguiente comando desde la terminal:
java -jar target/kafka-streams-example-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Salida:
INFO StreamsConfig values: application.id = stream-app application.server = bootstrap.servers = [127.0.0.1:9092] buffered.records.per.partition = 1000 cache.max.bytes.buffering = 10485760 client.id = commit.interval.ms = 30000 connections.max.idle.ms = 540000 default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde key.serde = null metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 num.standby.replicas = 0 num.stream.threads = 1 partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper poll.ms = 100 processing.guarantee = at_least_once receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 replication.factor = 1 request.timeout.ms = 40000 retry.backoff.ms = 100 rocksdb.config.setter = null security.protocol = PLAINTEXT send.buffer.bytes = 131072 state.cleanup.delay.ms = 600000 state.dir = /tmp/kafka-streams timestamp.extractor = null value.serde = null windowstore.changelog.additional.retention.ms = 86400000 zookeeper.connect = (org.apache.kafka.streams.StreamsConfig:223) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:473) INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [127.0.0.1:9092] check.crcs = true client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-consumer connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = stream-app heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig:223) INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83) INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:483) INFO ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [127.0.0.1:9092] check.crcs = true client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-restore-consumer connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 2147483647 max.poll.records = 1000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig:223) INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83) INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from CREATED to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980) INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from CREATED to RUNNING. (org.apache.kafka.streams.KafkaStreams:229) INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] Started Kafka Stream process (org.apache.kafka.streams.KafkaStreams:453) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread:523) INFO Discovered coordinator 192.168.14.141:9092 (id: 2147483647 rack: null) for group stream-app. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597) INFO Revoking previously assigned partitions [] for group stream-app (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. current assigned active tasks: [] current assigned standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:205) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. (org.apache.kafka.streams.processor.internals.StreamThread:980) INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from RUNNING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Updating suspended tasks to contain active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1400) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Removing all active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1407) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] partition revocation took 1 ms. suspended active tasks: [] suspended standby tasks: [] previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:227) INFO (Re-)joining group stream-app (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Constructed client metadata {d85064bc-a649-43bd-b74a-ba6fc5f51649=ClientMetadata{hostInfo=null, consumers=[stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-consumer-c55d36b5-a9ba-4832-aa11-c3469137c7ab], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:316) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Assigned tasks to clients as {d85064bc-a649-43bd-b74a-ba6fc5f51649=[activeTasks: ([0_0, 0_1, 1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1, 1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:493) INFO Successfully joined group stream-app with generation 8 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399) INFO Setting newly assigned partitions [input-topic-0, input-topic-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] for group stream-app (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] at state PARTITIONS_REVOKED: new partitions [input-topic-0, input-topic-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1, stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] assigned at the end of consumer rebalance. assigned active tasks: [0_0, 0_1, 1_0, 1_1] assigned standby tasks: [] current suspended active tasks: [] current suspended standby tasks: [] previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:160) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. (org.apache.kafka.streams.processor.internals.StreamThread:980) INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from REBALANCING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Adding assigned tasks as active {0_0=[input-topic-0], 0_1=[input-topic-1], 1_0=[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0], 1_1=[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1]} (org.apache.kafka.streams.processor.internals.StreamThread:1280) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 0_0 with assigned partitions [[input-topic-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:1263) INFO ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [127.0.0.1:9092] buffer.memory = 33554432 client.id = stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1-producer compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 100 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 10 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig:223) INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83) INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84) INFO task [0_0] Created state store manager for task 0_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122) INFO task [0_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 0_0 with assigned partitions [input-topic-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 0_1 with assigned partitions [[input-topic-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229) INFO task [0_1] Created state store manager for task 0_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122) INFO task [0_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 0_1 with assigned partitions [input-topic-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 1_0 with assigned partitions [[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229) INFO task [1_0] Created state store manager for task 1_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122) INFO task [1_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 1_0 with assigned partitions [stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Creating active task 1_1 with assigned partitions [[stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229) INFO task [1_1] Created state store manager for task 1_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122) INFO task [1_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Created active task 1_1 with assigned partitions [stream-app-KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Starting restoring state stores from changelog topics [] (org.apache.kafka.streams.processor.internals.StoreChangelogReader:121) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] Adding assigned standby tasks {} (org.apache.kafka.streams.processor.internals.StreamThread:1345) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] State transition from ASSIGNING_PARTITIONS to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980) INFO stream-client [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649] State transition from REBALANCING to RUNNING. (org.apache.kafka.streams.KafkaStreams:229) INFO stream-thread [stream-app-d85064bc-a649-43bd-b74a-ba6fc5f51649-StreamThread-1] partition assignment took 147 ms. current active tasks: [0_0, 0_1, 1_0, 1_1] current standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:193)
Con esto podemos ver que nuestra aplicación se construyó de forma correcta y que está lista para desplegarla en nuestro cluster.
Si quieres ver el código completo puedes verlo en el siguiente enlace https://github.com/raidentrance/kafka-streams-example.
En este ejemplo vimos algo básico de Kafka streams en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.