https://spring.io/projects/spring-kafka
사용 기술
- Gradle
- Java 8
- Spring Boot 2
- 여기서는 2.4.13 사용
- Spring Kafka
- 위 버전을 사용하면 spring-kafka 2.6.12가 사용된다
- https://docs.spring.io/spring-kafka/docs/2.6.12/reference/html/
- Spring Kafka Test
- 내장 카프카 브로커 사용을 위함
설치
https://docs.spring.io/spring-kafka/docs/2.6.12/reference/html/#introduction
build.gradle
plugins {
id "java"
id "org.springframework.boot" version "2.4.13"
id "io.spring.dependency-management" version "1.1.6"
id "io.freefair.lombok" version "8.10.2"
}
group = "learn"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
// ~ Spring
implementation "org.springframework.boot:spring-boot-starter-test"
// ~ Kafka
implementation "org.springframework.kafka:spring-kafka"
implementation "org.springframework.kafka:spring-kafka-test"
}
test {
useJUnitPlatform()
}
설치된 카프카 서버가 없다면, Spring Kafa Test를 통해 내장 카프카 브로커를 사용할 수 있다
https://docs.spring.io/spring-kafka/reference/testing.html
전이적 의존성


KafkaTestUtils
많은 정적 도우미 메서드를 제공한다
- 생산자, 소비자 속성 설정
- 레코드 소비하기
- 다양한 레코드 옵셋 조회
- 등등
생산자, 소비자, 컨테이너, KafkaTemplate을 이용해 내장 카프카 사용하기
https://github.com/venzersiz/learn-apache-kafka/blob/main/src/test/java/learn/EmbeddedKafkaTest.java
카프카에 KafkaTemplate과 @KafkaListener로 발행, 구독하기
여기서는 로컬에 Docker로 카프카를 구성했다고 가정한다
application.yml
spring:
kafka:
bootstrap-servers: localhost:19092,localhost:29092,localhost:39092
consumer:
enable-auto-commit: false # Default: false
listener:
ack-mode: batch # Default: batch
KafkaPublisher.java
package learn.component;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class KafkaPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
public void publish(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
KafkaSubscriber.java
package learn.component;
import lombok.Getter;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Getter
public class KafkaSubscriber {
private boolean received;
@KafkaListener(topics = "topic-1", groupId = "test-group")
public void subscribeTopic1(String message) {
System.out.println("받은 메시지: " + message);
received = true;
}
}
ExternalKafkaTest.java
package learn;
import java.util.concurrent.TimeUnit;
import learn.component.KafkaPublisher;
import learn.component.KafkaSubscriber;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class ExternalKafkaTest {
@Autowired
KafkaPublisher publisher;
@Autowired
KafkaSubscriber subscriber;
@Test
void publishAndSubscribe() throws InterruptedException {
publisher.publish("topic-1", "message-1");
while (true) {
if (subscriber.isReceived()) {
break;
}
TimeUnit.SECONDS.sleep(1);
}
}
}
생산자 팩토리, 소비자 팩토리, 컨테이너 팩토리 빈 직접 만들어 사용하기
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 클래스를 살펴보면 좋다
KafkaConfig.java
package learn.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerAddressesAsString;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.listener.ack-mode}")
private AckMode ackMode;
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddressesAsString);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddressesAsString);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory containerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ackMode);
return factory;
}
}
https://github.com/venzersiz/learn-apache-kafka/commit/2b079f5eabae6c2965b225b10c7809e047474084
https://github.com/venzersiz/learn-apache-kafka/commit/ece73e6e07e275eb2fb502908a81f600b416fd27
KafkaProperties를 사용해보자
applicaton.yml
기본값으로 설정된 것은 제외함
spring:
kafka:
bootstrap-servers:
- localhost:19092
- localhost:29092
- localhost:39092
KafkaConfig.java
@Configuration
@RequiredArgsConstructor
public class KafkaConfig {
private final KafkaProperties props;
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public ConsumerFactory<String, String> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory containerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
}
KafkaProperties의 기본값
- producer
- key.serializer: org.apache.kafka.common.serialization.StringSerializer
- value.serializer: org.apache.kafka.common.serialization.StringSerializer
- consumer
- key.serializer: org.apache.kafka.common.serialization.StringSerializer
- value.serializer: org.apache.kafka.common.serialization.StringSerializer
- isolation.level: read_uncommitted
메시지로 객체를 보내려면?
KafkaConfig.java
제네릭 값 타입을 String에서 Object로 모두 바꾼다
User.java
@Builder
@Getter
@ToString
public class User {
private String name;
private int age;
}
KafkaPublisher.java
@Component
@RequiredArgsConstructor
public class KafkaPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publish(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
값 타입을 String에서 Object로 바꾸었다
KafkaSubscriber.java
...
@KafkaListener(topics = "topic-2", groupId = "test-group")
public void subscribeTopic2(Object message) {
System.out.println("받은 메시지: " + message);
received = true;
}
...
ExternalKafkaTest.java
...
@Test
void publishAndSubscribe() throws InterruptedException {
User user = User.builder()
.name("산신령")
.age(100)
.build();
publisher.publish("topic-2", user);
...
테스트를 실행하면 아래 에러가 발생하며 실패한다
Can't convert value of class learn.model.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class learn.model.User to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: learn.model.User cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
User 클래스를 String 클래스로 형변환할 수 없다는 에러이다. 이를 해결하기 위해서는 카프카의 값 직렬화기를 변경해야한다
applicaton.yml
spring:
kafka:
...
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
테스트를 실행하면 다른 에러가 발생한다
Caused by: org.springframework.messaging.converter.MessageConversionException:
Cannot convert from [java.lang.String] to [learn.model.User] for GenericMessage
[payload={"name":"산신령","age":100}, headers={kafka_offset=29, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@15a2db34, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-2, kafka_receivedTimestamp=1731464256234, __TypeId__=[B@9ad51a4, kafka_groupId=test-group}]
역직렬화 시 String 타입을 User 타입으로 형변환할 수 없는 것이다. 이를 해결하기 위해 위와 유사하게 카프카의 값 역직렬화기를 변경해야 한다
application.yml
spring:
kafka:
...
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
테스트를 실행하면 또 다른 에러가 발생한다
Caused by: java.lang.IllegalArgumentException:
The class 'learn.model.User' is not in the trusted packages:
[java.util, java.lang]. If you believe this class is safe to deserialize,
please provide its name. If the serialization is only done by a trusted source,
you can also enable trust all (*).
안내해준대로 따라 해보자
application.yml
spring:
kafka:
...
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json:
trusted.packages: "*"
또 다른 에러가 발생했다
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Cannot construct instance of `learn.model.User` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: (byte[])"{"name":"산신령","age":100}"; line: 1, column: 2]
User 클래스가 기본 생성자가 없어서 Jackson 역직렬화 시 에러가 발생한 것
User.java
@Builder
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Getter
@ToString
public class User {
private String name;
private int age;
}
이제 테스트가 성공한다
받은 메시지: User(name=산신령, age=100)
발행, 구독하는 데이터의 타입 정보가 다른 경우
예를 들어 패키지명과 클래스명이 다른 경우를 가정해보자
User2.java
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString
public class User2 {
private String name;
private int age;
}
KafkaSubscriber.java
...
@KafkaListener(topics = "topic-3", groupId = "test-group")
public void subscribeTopic3(User2 user) {
System.out.println("받은 메시지: " + user);
received = true;
}
...
테스트를 실행하면 에러가 발생한다
Caused by: org.springframework.messaging.converter.MessageConversionException:
Cannot convert from [learn.model.User] to [learn.model.User2] for GenericMessage
[payload=User(name=산신령, age=100), headers={kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7b50f1da, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-3, kafka_receivedTimestamp=1731466159142, kafka_groupId=test-group}]
발행 때와 구독 때의 클래스 정보가 다르기 때문에 발생하는 에러다
이를 해결하는 여러 방법이 있지만, 여기서는 spring.json.type.mapping 프라퍼티를 사용하는 방법으로 설명하겠다
application.yml
spring:
kafka:
...
producer:
...
properties:
spring.json:
type.mapping: user:learn.model.User
consumer:
...
properties:
spring:
json:
...
type.mapping: user:learn.model.User2
...
이제 테스트가 성공한다
받은 메시지: User2(name=산신령, age=100)
ErrorHandlingDeserializer
그런데 위에서 에러가 발생했을 때 추가적으로 로그가 찍히던 것이 있었다
java.lang.IllegalStateException: This error handler cannot process
'SerializationException's directly;
please consider configuring an 'ErrorHandlingDeserializer'
in the value and/or key deserializer
ErrorHandlingDeserializer에 대해서 살펴보자
카프카 리스너에는 개별적으로 에러 처리기를 연결할 수 있다
CustomKafkaListenerErrorHandler.java
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
System.out.println("message: " + message);
System.out.println("exception: " + exception);
return null;
}
}
KafkaSubscriber.java
...
@KafkaListener(topics = "topic-3", groupId = "test-group", errorHandler = "customKafkaListenerErrorHandler")
public void subscribeTopic3(User2 user) {
System.out.println("받은 메시지: " + user);
received = true;
}
...
테스트를 위해 바로 위에 설정한 생산자와 소비자의 spring.json.type.mapping 설정을 주석하고 테스트를 실행해보자
CustomKafkaListenerErrorHandler의 로그가 찍히지 않는다. 이를 해결해보자
applicaion.yml
spring:
kafka:
...
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json:
trusted.packages: "*"
#type.mapping: user:learn.model.User2
deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
...
ErrorHandlingDeserializer는 내부에 실제 처리를 담당하는 역직렬화기를 가지고 있다. 위 설정대로라면 JsonDeserializer가 delegate(위임자)가 된다
결국 래퍼 클래스로서 실제 역직렬화 시 예외가 발생하면 그것을 잡아 처리를 하기 때문에 KafkaListenerErrorHandler의 코드가 실행되게 된다
CustomKafkaListenerErrorHandler의 로그는 다음과 같다
message: GenericMessage [payload=User(name=산신령, age=100), headers={kafka_offset=8, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@a3411b7, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=topic-3, kafka_receivedTimestamp=1731475283242, kafka_groupId=test-group}]
exception: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
https://github.com/venzersiz/learn-apache-kafka/commit/8cc40e62f46f2181b0626f241371eaec3eb9c11b
카프카를 여러 개 사용하는 경우
application.yml
spring:
kafka:
a-zone:
bootstrap-servers:
- localhost:19092
- localhost:29092
- localhost:39092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json:
trusted.packages: "*"
type.mapping: user:learn.model.User
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json:
trusted.packages: "*"
type.mapping: user:learn.model.User2
deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
b-zone:
bootstrap-servers:
- localhost:18092
- localhost:28092
- localhost:38092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json:
trusted.packages: "*"
type.mapping: user:learn.model.User
consumer:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json:
trusted.packages: "*"
type.mapping: user:learn.model.User2
deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
A, B 지대가 있다고 가정한다
KafkaConfigForAZone.java
@Configuration
@EnableConfigurationProperties(KafkaPropertiesForAZone.class)
@RequiredArgsConstructor
public class KafkaConfigForAZone {
private final KafkaPropertiesForAZone props;
@Bean
public ProducerFactory<String, Object> kafkaProducerFactoryForAZone() {
return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplateForAZone(ProducerFactory<String, Object> kafkaProducerFactoryForAZone) {
return new KafkaTemplate<>(kafkaProducerFactoryForAZone);
}
@Bean
public ConsumerFactory<String, Object> kafkaConsumerFactoryForAZone() {
return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory containerFactoryForAZone(ConsumerFactory<String, Object> kafkaConsumerFactoryForAZone) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactoryForAZone);
return factory;
}
@Bean
public KafkaPropertiesForAZone kafkaPropertiesForAZone() {
return new KafkaPropertiesForAZone();
}
@ConfigurationProperties(prefix = "spring.kafka.a-zone")
@Getter
@Setter
static class KafkaPropertiesForAZone extends KafkaProperties {
}
}
KafkaConfigForBZone.java
@Configuration
@EnableConfigurationProperties(KafkaPropertiesForBZone.class)
@RequiredArgsConstructor
public class KafkaConfigForBZone {
private final KafkaPropertiesForBZone props;
@Bean
public ProducerFactory<String, Object> kafkaProducerFactoryForBZone() {
return new DefaultKafkaProducerFactory<>(props.buildProducerProperties());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplateForBZone(ProducerFactory<String, Object> kafkaProducerFactoryForBZone) {
return new KafkaTemplate<>(kafkaProducerFactoryForBZone);
}
@Bean
public ConsumerFactory<String, Object> kafkaConsumerFactoryForBZone() {
return new DefaultKafkaConsumerFactory<>(props.buildConsumerProperties());
}
@Bean
public KafkaListenerContainerFactory containerFactoryForBZone(ConsumerFactory<String, Object> kafkaConsumerFactoryForBZone) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactoryForBZone);
return factory;
}
@Bean
public KafkaPropertiesForBZone kafkaPropertiesForBZone() {
return new KafkaPropertiesForBZone();
}
@ConfigurationProperties(prefix = "spring.kafka.b-zone")
@Getter
@Setter
static class KafkaPropertiesForBZone extends KafkaProperties {
}
}
KafkaPublisherForAZone.java
@Component
public class KafkaPublisherForAZone {
@Resource(name = "kafkaTemplateForAZone")
private KafkaTemplate<String, Object> kafkaTemplate;
public void publish(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
KafkaPublisherForBZone.java
@Component
public class KafkaPublisherForBZone {
@Resource(name = "kafkaTemplateForBZone")
private KafkaTemplate<String, Object> kafkaTemplate;
public void publish(String topic, Object message) {
kafkaTemplate.send(topic, message);
}
}
KafkaSubscriber.java
@Component
@Getter
public class KafkaSubscriber {
private int receivedCount;
@KafkaListener(topics = "topic-1", groupId = "test-group")
public void subscribeTopic1(String message) {
System.out.println("받은 메시지: " + message);
receivedCount++;
}
@KafkaListener(topics = "topic-2", groupId = "test-group")
public void subscribeTopic2(User user) {
System.out.println("받은 메시지: " + user);
receivedCount++;
}
@KafkaListener(topics = "topic-3", groupId = "test-group", errorHandler = "customKafkaListenerErrorHandler")
public void subscribeTopic3(User2 user) {
System.out.println("받은 메시지: " + user);
receivedCount++;
}
}
ExternalKafkaTest.java
@SpringBootTest
class ExternalKafkaTest {
@Autowired
KafkaPublisherForAZone publisherForAZone;
@Autowired
KafkaPublisherForBZone publisherForBZone;
@Autowired
KafkaSubscriber subscriber;
@Test
void publishAndSubscribe() throws InterruptedException {
User user = User.builder()
.name("산신령")
.age(100)
.build();
publisherForAZone.publish("topic-3", user);
publisherForBZone.publish("topic-3", user);
while (true) {
if (subscriber.getReceivedCount() == 2) {
break;
}
TimeUnit.SECONDS.sleep(1);
}
}
}
메시지를 2번 발송하고 2번 수신하면 테스트를 종료하게 했다
테스트를 실행하면 에러가 발생한다
Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException:
No qualifying bean of type 'org.springframework.boot.autoconfigure.kafka.KafkaProperties'
available: expected single matching bean but found 5:
kafkaPropertiesForAZone,
kafka.a-zone-learn.config.KafkaConfigForAZone$KafkaPropertiesForAZone,
kafkaPropertiesForBZone,
kafka.b-zone-learn.config.KafkaConfigForBZone$KafkaPropertiesForBZone,
spring.kafka-org.springframework.boot.autoconfigure.kafka.KafkaProperties
카프카 자동 설정 빈이 등록되면서 발생하는 문제이다. 자동 설정되지 않도록 프라퍼티를 설정한다
application.yml
spring:
autoconfigure:
exclude: org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
...
참고