지난 글에서 Kafka의 기본의 되는 개념을 공부했다. 이제 프로젝트에서 적용해보자.
1. 카프카, 주키퍼 서버 실행
다음과 같이 docker-compose 파일 작성 후 docker-compose up -d 명령어로 kafka 서버를 실행한다. (각 요소가 무엇인지 알고 싶고 또 다른 설정을 하고 싶다면 다음의 링크 참고 https://www.baeldung.com/ops/kafka-docker-setup)
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
2. application.yml 설정
실행된 카프카 서버와 어플리케이션을 연동하기 위해 다음의 설정을 해준다.
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
retries: 3
retry.backoff.ms: 1000
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: com.unicornstudy.singleshop.*
직관적으로 각 요소에 있는 설정이 무엇을 의미하는지 알 수 있을 것이다.
카프카에 메시지를 전송할 때 카프카 템플릿을 이용하는데 이 때 <토픽이름, 전송할 메시지> 형식으로 되어있다. 따라서 해당 값들을 직렬화 역직렬화를 하기 위해 key, value에 관한 적절한 설정을 해줘야 한다.
producer의 properties
- 카프카가 메시지 전송을 실패 할 때 무한한 재시도를 막기 위해 재시도 횟수와 각 시도별 연장되는 시간을 설정했다.
conumer의 properties
- consumer에 있는 properties에 해당하는 값은 컨슈머에서 역직렬화를 수행할 때 신뢰할 수 있는 객체인지 패키지를 설정해줘야 하기 때문에 설정했다.
3. 프로듀서 작성
package com.unicornstudy.singleshop.items.command.application.aop;
import com.unicornstudy.singleshop.items.command.application.dto.ItemsResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class ItemsAspect {
private final KafkaTemplate<String, Object> kafkaTemplate;
@AfterReturning(pointcut = "execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.save(..)) " +
"|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.update(..))" +
"|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.subtractQuantity(..))" +
"|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.addQuantity(..))"
, returning = "itemsResponseDto")
public void sendToCreateAndUpdateMessageQueue(ItemsResponseDto itemsResponseDto) {
log.info("ItemsService 저장 성공, 메시지큐에 메시지 전송");
sendToMessageQueue("ItemsCreateAndUpdateTopic", "CreateAndUpdateDLQ", itemsResponseDto);
}
@AfterReturning(pointcut = "execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.delete(..))", returning = "id")
public void sendToDeleteMessageQueue(Long id) {
log.info("ItemsService 삭제 성공, 메시지큐에 메시지 전송");
sendToMessageQueue("ItemsDeleteTopic", "DeleteDLQ", id);
}
private void sendToMessageQueue(String topic, String dlq, Object payload) {
CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, payload);
future.whenComplete((result, exception) -> {
if (exception != null) {
log.error("메시지 전달 실패 Dead Letter Queue에 메시지 전송");
kafkaTemplate.send(dlq, payload);
} else {
log.info("메시지 큐에 전송 성공");
}
});
}
}
프로듀서는 위와 같이 AOP를 적용시켰다. CUD 작업이 성공적으로 수행할 때 kafka에 메시지를 전송하도록 했으며 future 클래스를 통해 메시지 전송 실패시 Dead Letter Queue에 메시지를 전송하도록 설정했다.
4. 컨슈머 작성
package com.unicornstudy.singleshop.items.command.application.messageQueue.kafka;
import com.unicornstudy.singleshop.items.command.application.dto.ItemsResponseDto;
import com.unicornstudy.singleshop.items.query.domain.repository.ItemsSearchRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class ItemsEventListener {
private final ItemsSearchRepository itemsSearchRepository;
@KafkaListener(topics = "ItemsCreateAndUpdateTopic", groupId = "items")
public void createAndUpdateItemsIndex(ItemsResponseDto itemsResponseDto) {
log.info("ItemsCreateAndUpdateTopic 큐 메시지 도착");
itemsSearchRepository.save(itemsResponseDto.toEntity());
}
@KafkaListener(topics = "ItemsDeleteTopic", groupId = "items")
public void deleteItemsIndex(Long id) {
log.info("ItemsDeleteTopic 큐 메시지 도착");
itemsSearchRepository.deleteById(id);
}
}
컨슈머에서는 메시지를 요구사항에 맞게 소비하면 된다.
5. 예외처리
package com.unicornstudy.singleshop.common.infrastructure.kafka;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
@RequiredArgsConstructor
public class kafkaConfig {
private final KafkaTemplate<String, String> kafkaTemplate;
private final long INTERVAL_TIME = 1L;
private final long MAX_ATTEMPTS = 3L;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(INTERVAL_TIME, MAX_ATTEMPTS));
return errorHandler;
}
}
RECORD 옵션을 설정해서 메시지가 소비될 때 메소드가 예외를 던진다면 메시지 소비를 실패하도록 해줬다. 또한 error가 발생하면 DLQ에 메시지를 전송하도록 햇으며 재시도 횟수는 3회, 시도당 연장시간은 1초로 설정했다.
이외에도 자세한 설정을 할 수 있고 더 섬세한 설정을 하고 싶다면 공식문서를 참고하자.
+ 최범균님의 카프카 아는척하기 시리즈를 본다면 많이 도움이 될 것이다.
마치며
CQRS 패턴을 적용하고 카프카까지 연동하는 과정을 설명했다. 새로운 기술을 적용할 때 마다 느끼는 점이지만 새로운 기술을 적용하는데는 꽤나 많은 비용이 소모된다. (요즘에는 문서도 잘나오고 훌륭한 개발자분들의 강의도 있어 비교적 쉬운 편이다.) 따라서 기술을 사용하기 전 충분한 근거를 갖고 사용함과 동시에 해당 기술의 개념 정도는 확실하게 이해하고 적용하는 것 중요하다고 느껴진다.
'프로젝트 > 고민' 카테고리의 다른 글
Slack Webhook API 사용법 (0) | 2023.05.31 |
---|---|
Elasticsearch 쿼리빌더로 리팩토링 (0) | 2023.05.23 |
CQRS패턴 도입 이야기: Kafka 조금 알아보기 (0) | 2023.05.22 |
CQRS패턴 도입 이야기: CQRS 구현 방법 (0) | 2023.05.22 |
CQRS패턴 도입 이야기: CQRS 패턴이란? (0) | 2023.05.21 |