Spring/Kafka

Spring Boot > Kafka > 카프카 시작하기

Krevis 2024. 10. 28. 11:40

https://spring.io/projects/spring-kafka

 

 

사용 기술

 

 

설치

https://docs.spring.io/spring-kafka/docs/2.6.12/reference/html/#introduction

 

build.gradle

plugins {
    id "java"
    id "org.springframework.boot" version "2.4.13"
    id "io.spring.dependency-management" version "1.1.6"
    id "io.freefair.lombok" version "8.10.2"
}

group = "learn"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
    // ~ Spring
    implementation "org.springframework.boot:spring-boot-starter-test"

    // ~ Kafka
    implementation "org.springframework.kafka:spring-kafka"
    implementation "org.springframework.kafka:spring-kafka-test"
}

test {
    useJUnitPlatform()
}

 

설치된 카프카 서버가 없다면, Spring Kafa Test를 통해 내장 카프카 브로커를 사용할 수 있다

https://docs.spring.io/spring-kafka/reference/testing.html

 

전이적 의존성

 

 

KafkaTestUtils

많은 정적 도우미 메서드를 제공한다

  • 생산자, 소비자 속성 설정
  • 레코드 소비하기
  • 다양한 레코드 옵셋 조회
  • 등등

 

 

생산자, 소비자, 컨테이너,  KafkaTemplate을 이용해 내장 카프카 사용하기

https://github.com/venzersiz/learn-apache-kafka/blob/main/src/test/java/learn/EmbeddedKafkaTest.java

 

 

카프카에 KafkaTemplate과 @KafkaListener로 발행, 구독하기

여기서는 로컬에 Docker로 카프카를 구성했다고 가정한다

 

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
    consumer:
      enable-auto-commit: false # Default: false
    listener:
      ack-mode: batch # Default: batch

 

KafkaPublisher.java

package learn.component;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class KafkaPublisher {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void publish(String topic, String message) {

        kafkaTemplate.send(topic, message);
    }
}

 

KafkaSubscriber.java

package learn.component;

import lombok.Getter;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Getter
public class KafkaSubscriber {

    private boolean received;

    @KafkaListener(topics = "topic-1", groupId = "test-group")
    public void subscribeTopic1(String message) {

        System.out.println("받은 메시지: " + message);
        received = true;
    }
}

 

ExternalKafkaTest.java

package learn;

import java.util.concurrent.TimeUnit;
import learn.component.KafkaPublisher;
import learn.component.KafkaSubscriber;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class ExternalKafkaTest {

    @Autowired
    KafkaPublisher publisher;

    @Autowired
    KafkaSubscriber subscriber;

    @Test
    void publishAndSubscribe() throws InterruptedException {
        publisher.publish("topic-1", "message-1");

        while (true) {
            if (subscriber.isReceived()) {
                break;
            }

            TimeUnit.SECONDS.sleep(1);
        }
    }
}

 

 

생산자 팩토리, 소비자 팩토리, 컨테이너 팩토리 빈 직접 만들어 사용하기

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 클래스를 살펴보면 좋다

 

KafkaConfig.java

package learn.config;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String brokerAddressesAsString;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Value("${spring.kafka.listener.ack-mode}")
    private AckMode ackMode;

    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddressesAsString);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {

        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddressesAsString);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory containerFactory(ConsumerFactory<String, String> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ackMode);
        return factory;
    }
}

 

https://github.com/venzersiz/learn-apache-kafka/commit/2b079f5eabae6c2965b225b10c7809e047474084

https://github.com/venzersiz/learn-apache-kafka/commit/ece73e6e07e275eb2fb502908a81f600b416fd27

 

 

KafkaProperties를 사용해보자

applicaton.yml

기본값으로 설정된 것은 제외함

spring:
  kafka:
    bootstrap-servers:
      - localhost:19092
      - localhost:29092
      - localhost:39092

 

KafkaConfig.java

@Configuration
@RequiredArgsConstructor
public class KafkaConfig {

    private final KafkaProperties props;

    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {

        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory containerFactory(ConsumerFactory<String, String> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }
}

 

KafkaProperties의 기본값

  • producer
    • key.serializer: org.apache.kafka.common.serialization.StringSerializer
    • value.serializer: org.apache.kafka.common.serialization.StringSerializer
  • consumer
    • key.serializer: org.apache.kafka.common.serialization.StringSerializer
    • value.serializer: org.apache.kafka.common.serialization.StringSerializer
    • isolation.level: read_uncommitted

 

 

메시지로 객체를 보내려면?

KafkaConfig.java

제네릭 값 타입을 String에서 Object로 모두 바꾼다

 

User.java

@Builder
@Getter
@ToString
public class User {

    private String name;

    private int age;
}

 

KafkaPublisher.java

@Component
@RequiredArgsConstructor
public class KafkaPublisher {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void publish(String topic, Object message) {

        kafkaTemplate.send(topic, message);
    }
}

값 타입을 String에서 Object로 바꾸었다

 

KafkaSubscriber.java

...
    @KafkaListener(topics = "topic-2", groupId = "test-group")
    public void subscribeTopic2(Object message) {

        System.out.println("받은 메시지: " + message);
        received = true;
    }
...

 

ExternalKafkaTest.java

...
    @Test
    void publishAndSubscribe() throws InterruptedException {

        User user = User.builder()
                        .name("산신령")
                        .age(100)
                        .build();

        publisher.publish("topic-2", user);
...

 

테스트를 실행하면 아래 에러가 발생하며 실패한다

Can't convert value of class learn.model.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class learn.model.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: learn.model.User cannot be cast to java.lang.String
	at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)

 

User 클래스를 String 클래스로 형변환할 수 없다는 에러이다. 이를 해결하기 위해서는 카프카의 값 직렬화기를 변경해야한다

 

applicaton.yml

spring:
  kafka:
    ...
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

 

 

테스트를 실행하면 다른 에러가 발생한다

 

Caused by: org.springframework.messaging.converter.MessageConversionException: 
 Cannot convert from [java.lang.String] to [learn.model.User] for GenericMessage 
 [payload={"name":"산신령","age":100}, headers={kafka_offset=29, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@15a2db34, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-2, kafka_receivedTimestamp=1731464256234, __TypeId__=[B@9ad51a4, kafka_groupId=test-group}]

 

역직렬화 시 String 타입을 User 타입으로 형변환할 수 없는 것이다. 이를 해결하기 위해 위와 유사하게 카프카의 값 역직렬화기를 변경해야 한다

 

application.yml

spring:
  kafka:
...
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

 

테스트를 실행하면 또 다른 에러가 발생한다

Caused by: java.lang.IllegalArgumentException: 
 The class 'learn.model.User' is not in the trusted packages: 
 [java.util, java.lang]. If you believe this class is safe to deserialize, 
 please provide its name. If the serialization is only done by a trusted source, 
 you can also enable trust all (*).

 

안내해준대로 따라 해보자

 

application.yml

spring:
  kafka:
...
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json:
          trusted.packages: "*"

 

또 다른 에러가 발생했다

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: 
 Cannot construct instance of `learn.model.User` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{"name":"산신령","age":100}"; line: 1, column: 2]

 

User 클래스가 기본 생성자가 없어서 Jackson 역직렬화 시 에러가 발생한 것

 

User.java

@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
@ToString
public class User {

    private String name;

    private int age;
}

 

이제 테스트가 성공한다

받은 메시지: User(name=산신령, age=100)

 

 

발행, 구독하는 데이터의 타입 정보가 다른 경우

예를 들어 패키지명과 클래스명이 다른 경우를 가정해보자

 

User2.java

@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString
public class User2 {

    private String name;

    private int age;
}

 

KafkaSubscriber.java

...
    @KafkaListener(topics = "topic-3", groupId = "test-group")
    public void subscribeTopic3(User2 user) {

        System.out.println("받은 메시지: " + user);
        received = true;
    }
...

 

테스트를 실행하면 에러가 발생한다

Caused by: org.springframework.messaging.converter.MessageConversionException: 
 Cannot convert from [learn.model.User] to [learn.model.User2] for GenericMessage 
 [payload=User(name=산신령, age=100), headers={kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b50f1da, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-3, kafka_receivedTimestamp=1731466159142, kafka_groupId=test-group}]

 

발행 때와 구독 때의 클래스 정보가 다르기 때문에 발생하는 에러다

 

이를 해결하는 여러 방법이 있지만, 여기서는 spring.json.type.mapping 프라퍼티를 사용하는 방법으로 설명하겠다

 

application.yml

spring:
  kafka:
...
    producer:
...
      properties:
        spring.json:
          type.mapping: user:learn.model.User
    consumer:
...
      properties:
        spring:
          json:
...
            type.mapping: user:learn.model.User2
...

 

이제 테스트가 성공한다

받은 메시지: User2(name=산신령, age=100)

 

 

ErrorHandlingDeserializer

그런데 위에서 에러가 발생했을 때 추가적으로 로그가 찍히던 것이 있었다

java.lang.IllegalStateException: This error handler cannot process 
 'SerializationException's directly; 
 please consider configuring an 'ErrorHandlingDeserializer' 
 in the value and/or key deserializer

 

ErrorHandlingDeserializer에 대해서 살펴보자

 

카프카 리스너에는 개별적으로 에러 처리기를 연결할 수 있다

 

CustomKafkaListenerErrorHandler.java

@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {

        System.out.println("message: " + message);
        System.out.println("exception: " + exception);
        return null;
    }
}

 

KafkaSubscriber.java

...
    @KafkaListener(topics = "topic-3", groupId = "test-group", errorHandler = "customKafkaListenerErrorHandler")
    public void subscribeTopic3(User2 user) {

        System.out.println("받은 메시지: " + user);
        received = true;
    }
...

 

테스트를 위해 바로 위에 설정한 생산자와 소비자의 spring.json.type.mapping 설정을 주석하고 테스트를 실행해보자

 

CustomKafkaListenerErrorHandler의 로그가 찍히지 않는다. 이를 해결해보자

 

applicaion.yml

spring:
  kafka:
...
    consumer:
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          json:
            trusted.packages: "*"
            #type.mapping: user:learn.model.User2
          deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
...

 

ErrorHandlingDeserializer는 내부에 실제 처리를 담당하는 역직렬화기를 가지고 있다. 위 설정대로라면 JsonDeserializer가 delegate(위임자)가 된다

 

결국 래퍼 클래스로서 실제 역직렬화 시 예외가 발생하면 그것을 잡아 처리를 하기 때문에 KafkaListenerErrorHandler의 코드가 실행되게 된다

 

CustomKafkaListenerErrorHandler의 로그는 다음과 같다

message: GenericMessage [payload=User(name=산신령, age=100), headers={kafka_offset=8, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@a3411b7, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-3, kafka_receivedTimestamp=1731475283242, kafka_groupId=test-group}]
exception: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

 

 

https://github.com/venzersiz/learn-apache-kafka/commit/8cc40e62f46f2181b0626f241371eaec3eb9c11b

 

 

카프카를 여러 개 사용하는 경우

application.yml

spring:
  kafka:
    a-zone:
      bootstrap-servers:
        - localhost:19092
        - localhost:29092
        - localhost:39092
      producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        properties:
          spring.json:
            trusted.packages: "*"
            type.mapping: user:learn.model.User
      consumer:
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        properties:
          spring:
            json:
              trusted.packages: "*"
              type.mapping: user:learn.model.User2
            deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    b-zone:
      bootstrap-servers:
        - localhost:18092
        - localhost:28092
        - localhost:38092
      producer:
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        properties:
          spring.json:
            trusted.packages: "*"
            type.mapping: user:learn.model.User
      consumer:
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
        properties:
          spring:
            json:
              trusted.packages: "*"
              type.mapping: user:learn.model.User2
            deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

 

A, B 지대가 있다고 가정한다

 

KafkaConfigForAZone.java

@Configuration
@EnableConfigurationProperties(KafkaPropertiesForAZone.class)
@RequiredArgsConstructor
public class KafkaConfigForAZone {

    private final KafkaPropertiesForAZone props;

    @Bean
    public ProducerFactory<String, Object> kafkaProducerFactoryForAZone() {
        return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateForAZone(ProducerFactory<String, Object> kafkaProducerFactoryForAZone) {

        return new KafkaTemplate<>(kafkaProducerFactoryForAZone);
    }

    @Bean
    public ConsumerFactory<String, Object> kafkaConsumerFactoryForAZone() {
        return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory containerFactoryForAZone(ConsumerFactory<String, Object> kafkaConsumerFactoryForAZone) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactoryForAZone);
        return factory;
    }

    @Bean
    public KafkaPropertiesForAZone kafkaPropertiesForAZone() {
        return new KafkaPropertiesForAZone();
    }

    @ConfigurationProperties(prefix = "spring.kafka.a-zone")
    @Getter
    @Setter
    static class KafkaPropertiesForAZone extends KafkaProperties {

    }
}

 

KafkaConfigForBZone.java

@Configuration
@EnableConfigurationProperties(KafkaPropertiesForBZone.class)
@RequiredArgsConstructor
public class KafkaConfigForBZone {

    private final KafkaPropertiesForBZone props;

    @Bean
    public ProducerFactory<String, Object> kafkaProducerFactoryForBZone() {
        return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplateForBZone(ProducerFactory<String, Object> kafkaProducerFactoryForBZone) {

        return new KafkaTemplate<>(kafkaProducerFactoryForBZone);
    }

    @Bean
    public ConsumerFactory<String, Object> kafkaConsumerFactoryForBZone() {
        return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
    }

    @Bean
    public KafkaListenerContainerFactory containerFactoryForBZone(ConsumerFactory<String, Object> kafkaConsumerFactoryForBZone) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactoryForBZone);
        return factory;
    }

    @Bean
    public KafkaPropertiesForBZone kafkaPropertiesForBZone() {
        return new KafkaPropertiesForBZone();
    }

    @ConfigurationProperties(prefix = "spring.kafka.b-zone")
    @Getter
    @Setter
    static class KafkaPropertiesForBZone extends KafkaProperties {

    }
}

 

KafkaPublisherForAZone.java

@Component
public class KafkaPublisherForAZone {

    @Resource(name = "kafkaTemplateForAZone")
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void publish(String topic, Object message) {

        kafkaTemplate.send(topic, message);
    }
}

 

KafkaPublisherForBZone.java

@Component
public class KafkaPublisherForBZone {

    @Resource(name = "kafkaTemplateForBZone")
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void publish(String topic, Object message) {

        kafkaTemplate.send(topic, message);
    }
}

 

KafkaSubscriber.java

@Component
@Getter
public class KafkaSubscriber {

    private int receivedCount;

    @KafkaListener(topics = "topic-1", groupId = "test-group")
    public void subscribeTopic1(String message) {

        System.out.println("받은 메시지: " + message);
        receivedCount++;
    }

    @KafkaListener(topics = "topic-2", groupId = "test-group")
    public void subscribeTopic2(User user) {

        System.out.println("받은 메시지: " + user);
        receivedCount++;
    }

    @KafkaListener(topics = "topic-3", groupId = "test-group", errorHandler = "customKafkaListenerErrorHandler")
    public void subscribeTopic3(User2 user) {

        System.out.println("받은 메시지: " + user);
        receivedCount++;
    }
}

 

ExternalKafkaTest.java

@SpringBootTest
class ExternalKafkaTest {

    @Autowired
    KafkaPublisherForAZone publisherForAZone;

    @Autowired
    KafkaPublisherForBZone publisherForBZone;

    @Autowired
    KafkaSubscriber subscriber;

    @Test
    void publishAndSubscribe() throws InterruptedException {

        User user = User.builder()
                        .name("산신령")
                        .age(100)
                        .build();

        publisherForAZone.publish("topic-3", user);
        publisherForBZone.publish("topic-3", user);

        while (true) {
            if (subscriber.getReceivedCount() == 2) {
                break;
            }

            TimeUnit.SECONDS.sleep(1);
        }
    }
}

 

메시지를 2번 발송하고 2번 수신하면 테스트를 종료하게 했다

 

테스트를 실행하면 에러가 발생한다

Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: 
 No qualifying bean of type 'org.springframework.boot.autoconfigure.kafka.KafkaProperties' 
 available: expected single matching bean but found 5: 
  kafkaPropertiesForAZone,
  kafka.a-zone-learn.config.KafkaConfigForAZone$KafkaPropertiesForAZone,
  kafkaPropertiesForBZone,
  kafka.b-zone-learn.config.KafkaConfigForBZone$KafkaPropertiesForBZone,
  spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties

 

카프카 자동 설정 빈이 등록되면서 발생하는 문제이다. 자동 설정되지 않도록 프라퍼티를 설정한다

 

application.yml

spring:
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
...

 

 

 

참고