본문 바로가기

프로젝트/고민

CQRS패턴 도입 이야기: 프로젝트에 Kafka 적용하기

지난 글에서 Kafka의 기본의 되는 개념을 공부했다. 이제 프로젝트에서 적용해보자.

 

1. 카프카, 주키퍼 서버 실행

다음과 같이 docker-compose 파일 작성 후 docker-compose up -d 명령어로 kafka 서버를 실행한다. (각 요소가 무엇인지 알고 싶고 또 다른 설정을 하고 싶다면 다음의 링크 참고 https://www.baeldung.com/ops/kafka-docker-setup)

 

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

 

 

 

2. application.yml 설정

실행된 카프카 서버와 어플리케이션을 연동하기 위해 다음의 설정을 해준다.

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        retries: 3
        retry.backoff.ms: 1000
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      properties:
        spring.json.trusted.packages: com.unicornstudy.singleshop.*

직관적으로 각 요소에 있는 설정이 무엇을 의미하는지 알 수 있을 것이다.

 

카프카에 메시지를 전송할 때 카프카 템플릿을 이용하는데 이 때 <토픽이름, 전송할 메시지> 형식으로 되어있다. 따라서 해당 값들을 직렬화 역직렬화를 하기 위해 key, value에 관한 적절한 설정을 해줘야 한다.

 

producer의 properties

  • 카프카가 메시지 전송을 실패 할 때 무한한 재시도를 막기 위해 재시도 횟수와 각 시도별 연장되는 시간을 설정했다.

 

conumer의 properties

  • consumer에 있는 properties에 해당하는 값은 컨슈머에서 역직렬화를 수행할 때 신뢰할 수 있는 객체인지 패키지를 설정해줘야 하기 때문에 설정했다.

 

3. 프로듀서 작성

package com.unicornstudy.singleshop.items.command.application.aop;

import com.unicornstudy.singleshop.items.command.application.dto.ItemsResponseDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class ItemsAspect {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @AfterReturning(pointcut = "execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.save(..)) " +
            "|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.update(..))" +
            "|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.subtractQuantity(..))" +
            "|| execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.addQuantity(..))"
            , returning = "itemsResponseDto")
    public void sendToCreateAndUpdateMessageQueue(ItemsResponseDto itemsResponseDto) {
        log.info("ItemsService 저장 성공, 메시지큐에 메시지 전송");
        sendToMessageQueue("ItemsCreateAndUpdateTopic", "CreateAndUpdateDLQ", itemsResponseDto);
    }

    @AfterReturning(pointcut = "execution(* com.unicornstudy.singleshop.items.command.application.ItemsService.delete(..))", returning = "id")
    public void sendToDeleteMessageQueue(Long id) {
        log.info("ItemsService 삭제 성공, 메시지큐에 메시지 전송");
        sendToMessageQueue("ItemsDeleteTopic", "DeleteDLQ", id);
    }

    private void sendToMessageQueue(String topic, String dlq, Object payload) {
        CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, payload);

        future.whenComplete((result, exception) -> {
            if (exception != null) {
                log.error("메시지 전달 실패 Dead Letter Queue에 메시지 전송");
                kafkaTemplate.send(dlq, payload);
            } else {
                log.info("메시지 큐에 전송 성공");
            }
        });
    }
}

프로듀서는 위와 같이 AOP를 적용시켰다. CUD 작업이 성공적으로 수행할 때 kafka에 메시지를 전송하도록 했으며 future 클래스를 통해 메시지 전송 실패시 Dead Letter Queue에 메시지를 전송하도록 설정했다.

 

4. 컨슈머 작성

package com.unicornstudy.singleshop.items.command.application.messageQueue.kafka;

import com.unicornstudy.singleshop.items.command.application.dto.ItemsResponseDto;
import com.unicornstudy.singleshop.items.query.domain.repository.ItemsSearchRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Slf4j
public class ItemsEventListener {

    private final ItemsSearchRepository itemsSearchRepository;

    @KafkaListener(topics = "ItemsCreateAndUpdateTopic", groupId = "items")
    public void createAndUpdateItemsIndex(ItemsResponseDto itemsResponseDto) {
        log.info("ItemsCreateAndUpdateTopic 큐 메시지 도착");
        itemsSearchRepository.save(itemsResponseDto.toEntity());
    }

    @KafkaListener(topics = "ItemsDeleteTopic", groupId = "items")
    public void deleteItemsIndex(Long id) {
        log.info("ItemsDeleteTopic 큐 메시지 도착");
        itemsSearchRepository.deleteById(id);
    }
}

 

컨슈머에서는 메시지를 요구사항에 맞게 소비하면 된다.

 

 

 

5.  예외처리

package com.unicornstudy.singleshop.common.infrastructure.kafka;

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
@RequiredArgsConstructor
public class kafkaConfig {

    private final KafkaTemplate<String, String> kafkaTemplate;

    private final long INTERVAL_TIME = 1L;

    private final long MAX_ATTEMPTS = 3L;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> multiTypeKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(errorHandler());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.afterPropertiesSet();
        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler() {
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(INTERVAL_TIME, MAX_ATTEMPTS));
        return errorHandler;
    }
}

 

RECORD 옵션을 설정해서 메시지가 소비될 때  메소드가 예외를 던진다면 메시지 소비를 실패하도록 해줬다. 또한 error가 발생하면 DLQ에 메시지를 전송하도록 햇으며 재시도 횟수는 3회, 시도당 연장시간은 1초로 설정했다.

 

이외에도 자세한 설정을 할 수 있고 더 섬세한 설정을 하고 싶다면 공식문서를 참고하자.

 

+ 최범균님의 카프카 아는척하기 시리즈를 본다면 많이 도움이 될 것이다.

 

마치며

CQRS 패턴을 적용하고 카프카까지 연동하는 과정을 설명했다. 새로운 기술을 적용할 때 마다 느끼는 점이지만 새로운 기술을 적용하는데는 꽤나 많은 비용이 소모된다. (요즘에는 문서도 잘나오고 훌륭한 개발자분들의 강의도 있어 비교적 쉬운 편이다.) 따라서 기술을 사용하기 전 충분한 근거를 갖고 사용함과 동시에 해당 기술의 개념 정도는 확실하게 이해하고 적용하는 것 중요하다고 느껴진다.