Apache Kafka: Ferramenta de mensageria de alta volumetria

Guia prático do Apache Kafka com Spring Boot: conceitos fundamentais, dependências Gradle, configuração YAML, exemplos de Producer e Consumer, boas práticas e instruções para execução local com Docker.

🧐 O que é? Qual o uso?

O Apache Kafka é uma plataforma de mensageria distribuída, open source, projetada para alta capacidade de envio e processamento de mensagens. Possui alta tolerância a falhas e é utilizada por grandes empresas que enfrentam desafios com grandes volumes de dados.

Conceitos Fundamentais

Antes de começar, é importante entender alguns conceitos básicos do Kafka:

  • Tópico: Categoria ou canal para onde as mensagens são publicadas
  • Partição: Divisão de um tópico que permite paralelismo e escalabilidade
  • Producer: Aplicação que publica mensagens em um tópico
  • Consumer: Aplicação que consome mensagens de um tópico
  • Consumer Group: Grupo de consumidores que trabalham juntos para consumir mensagens de um tópico, permitindo balanceamento de carga e tolerância a falhas
  • Offset: Posição de uma mensagem dentro de uma partição
  • Broker: Servidor Kafka que armazena e serve dados

Kafka vs Filas Tradicionais

Característica Filas Tradicionais Apache Kafka
Persistência Temporária Persistente
Escalabilidade Limitada Alta (horizontal)
Throughput Baixo/Médio Alto
Ordem Garantida por fila Garantida por partição
Replay Não suportado Suportado (via offsets)

Casos de Uso Típicos

  • Event sourcing: Captura de eventos para reconstrução do estado
  • Streaming em tempo real: Processamento contínuo de dados
  • Log aggregation: Centralização de logs de múltiplos serviços
  • Mensageria entre microservices: Comunicação assíncrona e desacoplada
  • Pipeline de dados: ETL e processamento de dados em batch

🛠️ Exemplo: Criando uma aplicação Spring Boot com Spring Kafka

Veja um passo a passo básico para criar uma aplicação simples usando Spring Boot e Spring Kafka:

1️⃣ Adicione as dependências no build.gradle:

1
2
3
4
5
6
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'com.fasterxml.jackson.core:jackson-databind'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

2️⃣ Configure o Kafka no application.yml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: exemplo-grupo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3

Explicação das propriedades:

  • bootstrap-servers: Endereço do(s) servidor(es) Kafka a serem conectados
  • consumer.group-id: Identifica o grupo de consumidores (permite balanceamento de carga)
  • consumer.auto-offset-reset: Define o que fazer se não houver offset salvo:
    • earliest: Começa do início do tópico
    • latest: Começa das mensagens mais recentes
    • none: Lança exceção se não houver offset salvo
  • consumer.key-deserializer e consumer.value-deserializer: Classes responsáveis por transformar os bytes recebidos em objetos Java
  • producer.key-serializer e producer.value-serializer: Classes responsáveis por transformar objetos Java em bytes para envio
  • consumer.enable-auto-commit: Desabilita auto-commit para processamento atômico
  • producer.acks: all garante que a mensagem foi replicada para todos os brokers
  • producer.retries: Número de tentativas em caso de falha

3️⃣ Exemplo de Producer (publicador):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ProducerService.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class ProducerService {
    private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message)
            .addCallback(
                result -> logger.info("Mensagem enviada para o tópico {}: {}", topic, message),
                failure -> logger.error("Falha ao enviar mensagem para o tópico {}", topic, failure)
            );
    }
}

Este é um exemplo de Produtor, que envia mensagens para um tópico Kafka. O KafkaTemplate é usado para enviar mensagens de forma assíncrona. O callback permite lidar com sucesso e falhas de forma robusta.

4️⃣ Exemplo de Consumer (assinante):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// ConsumerService.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class ConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

    @KafkaListener(topics = "meu-topico", groupId = "exemplo-grupo")
    public void listen(
            @Payload String message,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment acknowledgment
    ) {
        try {
            logger.info("Mensagem recebida | Tópico: {} | Partição: {} | Offset: {} | Conteúdo: {}",
                topic, partition, offset, message);
            
            // Processar a mensagem aqui
            
            // Confirma o processamento da mensagem
            acknowledgment.acknowledge();
        } catch (Exception e) {
            logger.error("Erro ao processar mensagem", e);
            // Em produção, você pode optar por não fazer acknowledge
            // para que a mensagem seja reprocessada
        }
    }
}

Este é um exemplo de Consumidor, que escuta mensagens de um tópico específico. O método listen será chamado sempre que uma nova mensagem for recebida. O uso de Acknowledgment permite controle fino sobre quando a mensagem é considerada processada.

🐳 Rodando Kafka com Docker

Para rodar o Kafka localmente, você pode usar o Docker. Lembre-se de iniciar o Kafka localmente antes de rodar a aplicação.

Opção 1: Usando docker-compose

Crie o arquivo docker-compose.yml na raiz do projeto:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

Inicie os serviços com:

1
docker-compose up -d

Opção 2: Comandos Docker diretos

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# Iniciar Zookeeper
docker run -d --name zookeeper -p 2181:2181 confluentinc/cp-zookeeper:7.5.0

# Iniciar Kafka
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1 \
  --link zookeeper:zookeeper \
  confluentinc/cp-kafka:7.5.0

Validar funcionamento

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Listar tópicos
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

# Criar um tópico manualmente
docker exec kafka kafka-topics --create --topic meu-topico --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# Listar configurações do tópico
docker exec kafka kafka-topics --describe --topic meu-topico --bootstrap-server localhost:9092

# Ver mensagens em um tópico
docker exec kafka kafka-console-consumer --topic meu-topico --from-beginning --bootstrap-server localhost:9092

Parar os serviços

1
2
3
4
5
6
# Se usando docker-compose
docker-compose down

# Se usando comandos diretos
docker stop kafka zookeeper
docker rm kafka zookeeper

✅ Testando a aplicação

Criar um endpoint para envio de mensagens

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@RestController
@RequestMapping("/api/messages")
public class MessageController {

    @Autowired
    private ProducerService producerService;

    @PostMapping
    public ResponseEntity<String> sendMessage(@RequestBody String message) {
        producerService.sendMessage("meu-topico", message);
        return ResponseEntity.ok("Mensagem enviada com sucesso");
    }
}

Testar com curl

1
curl -X POST http://localhost:8080/api/messages -H "Content-Type: application/json" -d "Olá Kafka!"

Você deve ver a mensagem sendo logada pelo consumer.

🔧 Boas Práticas

Para Produtor

  • Use acks: all em ambientes de produção para garantir durabilidade
  • Configure retries para lidar com falhas transitórias
  • Habilite compressão para grandes volumes de dados
  • Use particionamento quando precisar de paralelismo
  • Implemente idempotência no lado do consumidor

Para Consumidor

  • Desabilite auto-commit (enable-auto-commit: false) para processamento atômico
  • Use acknowledgment explícito para confirmar processamento
  • Configure timeouts adequados para evitar timeouts de sessão
  • Monitore lags para identificar consumidores lentos
  • Implemente Dead Letter Queue para mensagens que não podem ser processadas

Exemplo de Dead Letter Queue

1
2
3
4
5
6
7
spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "*"
    listener:
      ack-mode: manual_immediate
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class ConsumerService {
    
    @KafkaListener(
        topics = "meu-topico", 
        groupId = "exemplo-grupo",
        errorHandler = "kafkaErrorHandler"
    )
    public void listen(String message, Acknowledgment acknowledgment) {
        try {
            // Processar mensagem
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // Falha ao processar - não acknowledge para reprocessar
            // ou enviar para Dead Letter Queue manualmente
            throw e;
        }
    }
}

@Bean
public KafkaListenerErrorHandler kafkaErrorHandler() {
    return (message, exception) -> {
        log.error("Erro no consumer: {}", exception.getMessage());
        // Lógica de tratamento de erro
        return null;
    };
}

🔍 Troubleshooting

Problemas Comuns

  1. Connection refused: Verifique se o Kafka está rodando e a porta está correta
  2. Consumer não recebe mensagens: Verifique o group-id e auto-offset-reset
  3. Offset out of range: O tópico pode ter sido recriado, considere usar earliest
  4. SerializationException: Verifique se os serializers/deserializers correspondem

Logs úteis para debugging

1
2
3
4
logging:
  level:
    org.springframework.kafka: DEBUG
    org.apache.kafka: INFO

🔗 Mais informações

🎥 Vídeos

Para começar, vale assistir a este material do O Hipsters ponto Tube.
É um conteúdo compacto e muito interessante:

Para complementar, assista também ao vídeo do Código Fonte TV!
Ele tem 11 minutos e traz um resumo muito bem pensado sobre o Kafka:

comments powered by Disqus
Criado com Hugo
Tema Stack desenvolvido por Jimmy