카프카와 트랜잭션의 수행 시점
Kafka와 JPA를 함께 사용하면 수행되는 시점에서 차이가 있다. 우선 @Transactional을 적용한 경우 어떤 시점에 트랜잭션이 생성되고 수행되는지 확인해야 한다.
그림을 통해 먼저 확인해보자. 유저의 요청이 들어오면 Controller 에서 요청을 받아 Service Layer에서 request에서 받은 요청 또는 조회해서 반환할 내용을 DB에서 가져오게된다. 데이터의 변화가 발생했다면 @Transactional은 영속성 컨텍스트에 있는 1차 캐시(한 번 이미 조회하여 데이터가 있다고 가정)와 Snapshot을 비교하게 되는데 이때 더티체킹을 하여 변화가 있다면 Update 구문을 발생하게 된다. 이 쿼리는 메서드가 끝나는 시점에 트랜잭션이 수행되고 DB에 커밋되게 된다.
하지만 Kafka로 이벤트를 전달하는 메서드가 로직에 함께 있다면 어떻게 될까? 트랜잭션이 커밋되기 이전 시점에 카프카가 이벤트를 발생시키고 토픽을 통해 전송하게 된다.
다시 말하면 Transaction이 수행되는 과정에 문제가 생겨도 Kafka Message는 발행이 된다는 이야기이다.

문제가 발생하는 경우
아래 그림을 통해 어떻게 문제가 생기는지 확인해보자.
서비스레이어에서 kafkaEvent를 발송하게된 후 DB에 Transaction이 수행됐으나 어떠한 문제로 트랜잭션이 실패하여 롤백되었다.
하지만 카프카 이벤트는 트랜잭션과 별개로 수행되는 Kafka의 트랜잭션은 별도로 동작하기 때문에 별도의 설정이 필요하다.
그럼 이 문제를 어떻게 해결할 수 있을까?

코드와 콘솔을 보면 더 확실하게 확인할 수 있다.
Controller Layer
@GetMapping("/{boardId}")
public ResponseEntity<BoardResponseDto> find(
@PathVariable Long boardId,
@RequestHeader("X-User-Passport") String passport
){
BoardResponseDto boardResponseDto = boardService.findBoard(boardId,passport);
log.info("데이터 반환");
return ResponseEntity.ok(boardResponseDto);
}
Service Layer
@Override
@Transactional(isolation = Isolation.READ_COMMITTED)
public BoardResponseDto findBoard(Long boardId, String stringPassportDto) {
Board board = getBoard(boardId);
PassportDto passportDto = getPassportDto(stringPassportDto);
board.setViewCount(board.getViewCount() + 1);
kafkaProducer.sendPostViewedEvent(boardId, passportDto);
log.info("데이터 생성중");
return BoardResponseDto.toBoardResponseDto(board);
}
결과
KafkaInterCeptor에서 수행중과 수행 후 처리 로깅을 적용했다.
아래 로그를 보면 먼저 Kafka Transaction이 수행되고 데이터가 생성중이라는 로그가 나간 후 메서드가 종료되면서 쿼리가 나간다.

해결 방법 1 - Transaction의 Isloation Level = Read_Commited
트랜잭션의 고립 수준을 Read_Commited로 설정하는 방법이다. 해당 방법을 적용시키면 어떻게 변경될까?
Kafka Transaction이 Commit된 이벤트만 구독하게 된다.
만약 고립 레벨에 대해 아직 잘 모른다면 트랜잭션 고립수준 포스팅을 먼저 보고 오기 바란다!
해당 레벨은 말 그대로 커밋된 것만 조회가 가능하게 만든다는 레벨이다.
커밋이 확실히 수행된 데이터만 조회가 가능하기에 Dirty Read는 발생하지 않으나 Non-Repeatable 문제가 발생할 수는 있다.

Kafka Transaction 적용 시 주의 사항
1. 토픽의 메세지를 조회할 경우 토픽에 있는 메세지가 예외처리 과정에서 발생한 메세지일 수 있기에 이를 인지할 수 있는 별도의 키워드를 둬야 한다.
2. Consumer또한 Isolation.level 설정을 적용해야 한다.
3. Kafka Transaction이 적용된 이벤트는 offset이 반드시 1만 증가되지 않는다. 그 이유는 아래와 같다.
Kafka의 내부동작은 다음과 같다.
1. 배치
Kafka 프로듀서는 성능 최적화를 위해서 배치 단위로 보내며, 배치가 완료되면 한 번에 기록됨
배치 크기나 네트워크 지연으로 인해 일부 배치가 한 꺼번에 커밋되면서, 오프셋이 연속되지 않을 수 있다.
2. 압축
Kafka는 메시지 압축 기능을 제공하며, 압축된 데이터를 효율적으로 처리하기 위해 오프셋이 연속되지 않을 수 있다.
압축된 메시지는 병렬로 처리되기 때문에, 특정 메시지가 빠르게 커밋되면서 건너뛰는 현상이 발생할 수 있다.
3. 재전송
Kfaka 프로듀서는 메세지를 보내다가 실패하면 재전송을 할 수 있다.
일부 메세지가 먼저 전송되었다가 실패 후 재전송 할 경우, 다른 메세지보다 나중에 커밋될 수 있다.
이 경우, 오프셋이 건너뛰어지는 현상을 발생할 수 있다.
4. 병렬 처리
Kafak Streams 애플리케이션이 병렬로 처리되는 경우, 여러 쓰레드가 동시에 데이터를 처리한다.
병렬로 실행되는 경우, 메세지가 저장되는 순서가 보장되지 않아 연속된 오프셋이 아닐 수도 있다.
4. Kafka Tranasaction이 적용된 이벤트가 마지막일 경우 Consumer Group의 Log이 1이다. 이 설명 또한 아래의 내용을 읽어보면 이해할 수 있다.
Kafka Consumer의 Lag 동작 방식
Consumer Lag은 Producer가 마지막으로 보낸 메세지의 오프셋과 Consumer가 커밋한 오프셋 차이를 나타낸다.
일반적으로 Consumer가 모든 메세지를 처리하면 Lag=0으로 표현되어야 하지만, 특정 상황에서는 1이 유지될 수 있다.
Kfak Connect의 내부처리 방식
Kafka Connect의 Sink Connector는 "at-leaset-once" 전달 보장을 제공하기 위해 특정 방식으로 동작한다.
내부적으로 offset을 커밋하는 시점과 메세지 처리가 끝나는 시점이 다를 수 있다.
Connector가 마지막 오프셋을 커밋하기 전에 새로운 오프셋을 확인하는 경우, Lag이 1로 유지될 수 있다.
해결 방법
1. offset flush 주기를 줄이기
offset.flush.interval.ms=1000 (주기적인 오프셋 커밋을 적절하게 줄여준다.)
2. Connector가 처리할 최소 배치 크기를 조정한다.
tasks.max =1 (병렬 작업 수를 줄이면 오프셋 커밋이 더 장확하게 이뤄질 수 있다.
3. Consumer의 auto.offset.reset 설정 확인
auto.offset.reset=latest (latest로 설정하면 처음부터 처리되지 않은 메세지만 가져오므로 불필요한 lag증가를 방지할 수 있다)
해결 방법2 - TransactionalEventListenr 활용
이 방법을 적용하면 TransactionalEventListenr를 활용하여 TransactionPhase.AFTER_COMMIT 이후에 kafkaTempalte.send를 실행시키는 방법이다. Transaction의 Commit 이후에만 이벤트가 발생되며 예외가 발생한 경우에는 아예 이벤트가 발행되지 않게 된다.
TransactionEventLister를 활용하면 Akfka Transaction 적용시 Consumer가 고려하지 않아도 된다는 점이 있지만 각 트랜잭션마다 별도의 이벤트 리스너를 만들어 둬야 한다는 단점이 있다.

수정된 Service Layer
@Override
@Transactional
public BoardResponseDto findBoard(Long boardId, String stringPassportDto) {
Board board = getBoard(boardId);
PassportDto passportDto = getPassportDto(stringPassportDto);
board.setViewCount(board.getViewCount() + 1);
eventPublisher.publishEvent(new ViewEvent(boardId, passportDto));
log.info("데이터 생성중");
return BoardResponseDto.toBoardResponseDto(board);
}
추가된 EventListener
@Component
@RequiredArgsConstructor
@Slf4j
public class BoardEventListener {
private final KafkaProducer kafkaProducer;
// 트랜잭션 커밋 후 실행
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleViewEvent(ViewEvent event) {
log.info("트랜잭션 커밋 후 Kafka 메시지 전송: BoardId = {}", event.getBoardId());
kafkaProducer.sendPostViewedEvent(event.getBoardId(), event.getPassportDto());
}
}
결과
이처럼 쿼리가 나가고 성공적으로 커밋되거나 성공적인 조회 후 Kafka메세지가 발행된다.

'Spring > JPA' 카테고리의 다른 글
| [JPA] JPA vs MySQL vs JDBC vs JPQL vs QueryDSL (0) | 2025.03.11 |
|---|---|
| [JPA] @Transaction을 붙이지 않았을 때 생기는 문제, JPA Proxy, LazyLoading과 EagerLoading N+1 문제 (0) | 2025.02.22 |
| [JPA] Soft Delete 개발 방법(Hard Delete과의 차이) (0) | 2025.02.17 |
| [개념 정리]Spring Boot에서 JPA의 Soft Delete와 Cascade 연관관계 (1) | 2024.12.09 |
| save the transient instance before flushing (1) | 2024.11.20 |