Kafka 적응기

2024. 11. 13. 17:23Java

기존 프로젝트에 Kafka를 추가하면 좋은 거 같아서 사용을 위해 글을 작성합니다.

현재 RabbitMQ만을 사용하고 있으며 Kafka를 사용하는 것에 이점을 작성 후 코드에 대한 설명을 하려고 합니다.

### 어떤 점에 좋은가?

  1. 대용량 처리
  • Redis, PostgreSQL을 사용하고 있는 상황이라 데이터 처리량이 많을 것을 전제
  • Kafka는 대용량 메세지 처리에 적합
  1. 이벤트 저장
  • Kafka는 이벤트를 디스크에 저장, 필요할 때 재생 가능
  • 심부름/주문 시스템의 경우 이벤트 히스토리 관리가 중요 할 수 있음
  1. 현재 아키텍처에의 조화
  • Spring Boot, Redis, PostgreSQL 등 현대적인 스택을 사용중
  • Kafka가 이런 MSA 환경에 적합
  1. 확장성
  • WebSocket을 사용하는 실시간 처리가 필요한 서비스
  • Kafka의 파티셔닝을 통해 수평적 확장에 용이하다.

application.yml 에 kafka 설정 추가:

spring:
  kafka:
    bootstrap-servers: ${KAFKA_SERVERS:localhost:9092}
    consumer:
      group-id: absurdity-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

또한 도커를 사용하기 위해
docker-compose.yml 도 추가하겠습니다.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "22181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Config 설정 흐름

KafkaConfig
↓
1. ObjectMapper 설정
   - JavaTimeModule 등록
   - 날짜 타임스탬프 비활성화

2. Producer 설정
   - 서버: localhost:29092
   - Key: StringSerializer
   - Value: JsonSerializer

3. Consumer 설정
   - 서버: localhost:29092
   - Group ID: absurdity-group
   - Key: StringDeserializer
   - Value: JsonDeserializer
   - 신뢰할 수 있는 패키지: 전체(*)

KafkaConfig.class

@Configuration
public class KafkaConfig {

    // JSON 직렬화/역직렬화를 위한 ObjectMapper 설정
    @Bean
    public ObjectMapper objectMapper() {
        return new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);  // 날짜를 타임스탬프가 아닌 ISO-8601 형식으로 직렬화
    }

    // Kafka Producer 설정
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // 기본 설정
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        config.put(ProducerConfig.ACKS_CONFIG, "all");  // 모든 복제본이 메시지를 받았는지 확인
        config.put(ProducerConfig.RETRIES_CONFIG, 3);   // 실패 시 재시도 횟수
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 배치 크기 설정
        config.put(ProducerConfig.LINGER_MS_CONFIG, 1);  // 배치 전송 대기 시간

        return new DefaultKafkaProducerFactory<>(config);
    }

    // Kafka Consumer 설정
    @Bean
    public ConsumerFactory<String, ErrandStatusMessage> consumerFactory() {
        Map<String, Object> config = new HashMap<>();

        // 기본 설정
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "absurdity-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // 처음부터 메시지 읽기
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 수동 커밋 설정
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // 한 번에 가져올 최대 레코드 수

        return new DefaultKafkaConsumerFactory<>(
            config, 
            new StringDeserializer(), 
            new JsonDeserializer<>(ErrandStatusMessage.class, objectMapper())
        );
    }

    // Kafka Template 설정
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Kafka Listener Container Factory 설정
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 에러 처리 및 재시도 정책 설정
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            new FixedBackOff(1000L, 3L)  // 1초 간격으로 3번 재시도
        ));

        // 배치 리스너 설정
        factory.setBatchListener(true);

        return factory;
    }

    // Topic 설정
    @Bean
    public NewTopic errandTopic() {
        return TopicBuilder.name("errand-status-topic")
                .partitions(3)  // 병렬 처리를 위한 파티션 수
                .replicas(1)    // 복제본 수 (운영 환경에서는 2 이상 권장)
                .config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)) // 24시간 보관
                .build();
    }
}

버전에 따른 차이:
Spring Kafka 2.8 이전: setErrorHandler 사용
Spring Kafka 2.8 이후: setCommonErrorHandler 사용 (권장)

메시지 흐름

Producer 요청
    
ErrandStatusMessage 생성
    - errandId
    - status
    - timestamp
    
KafkaTemplate으로 메시지 전송
    - Topic: errand-status-topic
    - Key: errandId
    - Value: ErrandStatusMessage
    
Consumer가 메시지 수신
    - @KafkaListener 어노테이션으로 처리
    - 로그 기록
// 심부름 상태 메시지를 위한 Record 클래스
public record ErrandStatusMessage(
    @NotNull(message = "심부름 ID는 필수입니다")
    Long errandId,

    @NotBlank(message = "상태 값은 필수입니다")
    String status,

    @NotNull(message = "시간 정보는 필수입니다")
    LocalDateTime timeStamp
) {
    // 팩토리 메서드
    public static ErrandStatusMessage of(
        final Long errandId, 
        final String status
    ) {
        return new ErrandStatusMessage(
            Objects.requireNonNull(errandId, "심부름 ID는 null일 수 없습니다"),
            Objects.requireNonNull(status, "상태는 null일 수 없습니다"),
            LocalDateTime.now()
        );
    }

    // 유효성 검증 메서드
    public void validate() {
        if (errandId <= 0) {
            throw new IllegalArgumentException("심부름 ID는 양수여야 합니다");
        }
        if (status.trim().isEmpty()) {
            throw new IllegalArgumentException("상태는 비어있을 수 없습니다");
        }
    }
}

Producer && Consumer

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final MeterRegistry meterRegistry;  // 모니터링을 위한 메트릭 등록

    public void sendErrandStatus(ErrandStatusMessage message) {
        message.validate();  // 메시지 유효성 검증

        try {
            // 메시지 전송 및 결과 처리
            kafkaTemplate.send("errand-status-topic",
                    String.valueOf(message.errandId()), message)
                    .whenComplete((result, ex) -> {
                        if (ex == null) {
                            log.info("Message sent successfully for errand id: {}", 
                                message.errandId());
                            // 메트릭 기록
                            meterRegistry.counter("kafka.messages.sent.success").increment();
                        } else {
                            log.error("Failed to send message", ex);
                            meterRegistry.counter("kafka.messages.sent.failure").increment();
                        }
                    });
        } catch (Exception e) {
            log.error("Error sending message to Kafka", e);
            meterRegistry.counter("kafka.messages.sent.error").increment();
            throw new KafkaException("Failed to send message to Kafka", e);
        }
    }
}

이러한 구성을 통해:

  • 메시지의 안정적인 전송과 수신
  • 에러 상황에 대한 적절한 처리
  • 메시지 유효성 검증
  • 모니터링 및 메트릭 수집

이렇게 구성하고 있으며 다음 글에서 Test와 개선할 수 있는 사항 그리고 실행 및 도커 실행 등을 같이 포스팅 해보겠습니다.

'Java' 카테고리의 다른 글

RedisTemplate 의 객체지향  (1) 2024.11.20
Controller Restful 관련  (0) 2024.11.11
JWT 그런데 OAuth2.0 곁들인 (2)  (1) 2024.11.10
JWT 그런데 OAuth2.0 곁들인  (1) 2024.11.08
SSE(Server-Sent Events)  (0) 2024.11.06