Spring-Kafka 에서 카프카 메세지를 수신하기 위해서는 MessageListenrContainer와 Message Listener or @KafkaListenr를 사용하라고 한다.
Spring Kafka의 메세지 리스너 소비방식
1. 리스너 기반(Listener - based) - @KafkaListener 사용
2. Polling 기반(Manually Polling)방식 - KafkaConsumer.poll() 직접 호출
우리는 @KafkaListener를 더 일반적으로 사용한다.
@KafkaListener를 사용한 메세지 리스너
topics = 구독할 Kafka 토픽 지정
groupId = "my-group" 으로 컨슈머 그룹을 설정
메세지가 수신되면 자동으로 listen() 메서드가 수행된다.
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received: " + message);
}
ConsumerRecord를 활용한 메타데이터 접근
ConsumerRecord<String,String>을 사용하면 메세지의 상세 정보(오프셋, 파티션, 타임스탬프 등)접근이 가능하다.
디버깅 또는 로그 분석 시 유용하다.
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received message: " + record.value());
System.out.println("From partition: " + record.partition());
System.out.println("Offset: " + record.offset());
}
Acknowledgment를 사용한 수동 커밋(Manual Acknowledgement)
수동 커밋으로 수행하는 방식으로 메세지가 처리된 후 acknowledgement.acknowledge()를 호출해야 Kafka가 성공적으로 소비를 했다고 판단한다.
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
System.out.println("Processing: " + record.value());
// 메시지 처리 후 수동 커밋
acknowledgment.acknowledge();
}
Batch로 메세지 처리
한 번에 여러개의 메세지를 일괄적으로 처리할 수 있다.
containerFactory 를 batchFactory로 설정해야 사용가능하다.
배치 크기를 조정해 성능을 최적화할 수 있다.
@KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, String>> records) {
System.out.println("Received batch of messages: " + records.size());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message: " + record.value());
}
}
리스너 컨테이너 설정(KafkaListenerContainerFactory)
Spring Kafka에서는 리스너 동작을 커스터마이징하려면 KafkaListenerContainerFactory를 설정해야한다.
기본 컨테이너 팩토리
ConcurrentKafkaListenerContainerFactory 멀티 스레드로 Kafka 메세지를 처리할 수 있도록 지원하는 컨테이너
ConsumerFactory를 주입받아 Kafka 컨슈머의 설정을 지정 가능하다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
배치 처리를 위한 컨테이너 설정
factory.setBatchListener(true) 로 배치를 활성화 할 수 있다.
이를 @KafkaListener(containerFactory = "batchFactory")에 적용하면 여러 개의 메시지를 한 번에 처리 가능하다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 배치 모드 활성화
factory.setBatchListener(true);
return factory;
}
수동 커밋을 위한 컨테이너 설정
setAckMode(ContainerProperties.AckMode.MANUAL) 으로 수동 커밋 모드를 활성화 하 ㄹ수 있다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualAckFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 수동 커밋 설정
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
Kafka 메세지를 특정 조건에 맞춰 필터링
메세지의 값이 "ignore"를 포함하면 필터링됨
컨슈머에서 필터링된 메세지는 처리되지 않는다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> filterFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 특정 메시지 필터링 (예: "ignore"가 포함된 메시지는 무시)
factory.setRecordFilterStrategy(record -> record.value().contains("ignore"));
return factory;
}
병렬 처리
Kafka 리스너는 기본적으로 하나의 스레드에서 동작하지만, concurrency 옵션을 이용해 여러 개의 스레드에서 병렬로 처리가 가능하다.
concurrency를 통해 3개의 쓰레드로 병렬 메세지 소비
groupId를 사용하는 컨슈머 인스턴스가 토픽의 파티션을 나눠서 병렬 처리
@KafkaListener(topics = "my-topic", groupId = "my-group", concurrency = "3")
public void listen(String message) {
System.out.println("Received: " + message);
}
'Kafka' 카테고리의 다른 글
[Flink] Kafka 의 한계점과 그를 보완할 수 있는 Flink (1) | 2025.03.26 |
---|---|
[kafka+redisson] 올리브영의 재고관리시스템 분석 (0) | 2025.03.20 |
[kafka+redisson] 올리브영의 재고관리시스템 분석 1편 (0) | 2025.03.18 |