Kafka 적응기
2024. 11. 13. 17:23ㆍJava
기존 프로젝트에 Kafka를 추가하면 좋은 거 같아서 사용을 위해 글을 작성합니다.
현재 RabbitMQ만을 사용하고 있으며 Kafka를 사용하는 것에 이점을 작성 후 코드에 대한 설명을 하려고 합니다.
### 어떤 점에 좋은가?
- 대용량 처리
- Redis, PostgreSQL을 사용하고 있는 상황이라 데이터 처리량이 많을 것을 전제
- Kafka는 대용량 메세지 처리에 적합
- 이벤트 저장
- Kafka는 이벤트를 디스크에 저장, 필요할 때 재생 가능
- 심부름/주문 시스템의 경우 이벤트 히스토리 관리가 중요 할 수 있음
- 현재 아키텍처에의 조화
- Spring Boot, Redis, PostgreSQL 등 현대적인 스택을 사용중
- Kafka가 이런 MSA 환경에 적합
- 확장성
- WebSocket을 사용하는 실시간 처리가 필요한 서비스
- Kafka의 파티셔닝을 통해 수평적 확장에 용이하다.
application.yml 에 kafka 설정 추가:
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:localhost:9092}
consumer:
group-id: absurdity-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
또한 도커를 사용하기 위해
docker-compose.yml 도 추가하겠습니다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Config 설정 흐름
KafkaConfig
↓
1. ObjectMapper 설정
- JavaTimeModule 등록
- 날짜 타임스탬프 비활성화
2. Producer 설정
- 서버: localhost:29092
- Key: StringSerializer
- Value: JsonSerializer
3. Consumer 설정
- 서버: localhost:29092
- Group ID: absurdity-group
- Key: StringDeserializer
- Value: JsonDeserializer
- 신뢰할 수 있는 패키지: 전체(*)
KafkaConfig.class
@Configuration
public class KafkaConfig {
// JSON 직렬화/역직렬화를 위한 ObjectMapper 설정
@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); // 날짜를 타임스탬프가 아닌 ISO-8601 형식으로 직렬화
}
// Kafka Producer 설정
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// 기본 설정
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 복제본이 메시지를 받았는지 확인
config.put(ProducerConfig.RETRIES_CONFIG, 3); // 실패 시 재시도 횟수
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 배치 크기 설정
config.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 배치 전송 대기 시간
return new DefaultKafkaProducerFactory<>(config);
}
// Kafka Consumer 설정
@Bean
public ConsumerFactory<String, ErrandStatusMessage> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// 기본 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "absurdity-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 처음부터 메시지 읽기
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 수동 커밋 설정
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 한 번에 가져올 최대 레코드 수
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
new JsonDeserializer<>(ErrandStatusMessage.class, objectMapper())
);
}
// Kafka Template 설정
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Kafka Listener Container Factory 설정
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 에러 처리 및 재시도 정책 설정
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(1000L, 3L) // 1초 간격으로 3번 재시도
));
// 배치 리스너 설정
factory.setBatchListener(true);
return factory;
}
// Topic 설정
@Bean
public NewTopic errandTopic() {
return TopicBuilder.name("errand-status-topic")
.partitions(3) // 병렬 처리를 위한 파티션 수
.replicas(1) // 복제본 수 (운영 환경에서는 2 이상 권장)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)) // 24시간 보관
.build();
}
}
버전에 따른 차이:
Spring Kafka 2.8 이전: setErrorHandler 사용
Spring Kafka 2.8 이후: setCommonErrorHandler 사용 (권장)
메시지 흐름
Producer 요청
↓
ErrandStatusMessage 생성
- errandId
- status
- timestamp
↓
KafkaTemplate으로 메시지 전송
- Topic: errand-status-topic
- Key: errandId
- Value: ErrandStatusMessage
↓
Consumer가 메시지 수신
- @KafkaListener 어노테이션으로 처리
- 로그 기록
// 심부름 상태 메시지를 위한 Record 클래스
public record ErrandStatusMessage(
@NotNull(message = "심부름 ID는 필수입니다")
Long errandId,
@NotBlank(message = "상태 값은 필수입니다")
String status,
@NotNull(message = "시간 정보는 필수입니다")
LocalDateTime timeStamp
) {
// 팩토리 메서드
public static ErrandStatusMessage of(
final Long errandId,
final String status
) {
return new ErrandStatusMessage(
Objects.requireNonNull(errandId, "심부름 ID는 null일 수 없습니다"),
Objects.requireNonNull(status, "상태는 null일 수 없습니다"),
LocalDateTime.now()
);
}
// 유효성 검증 메서드
public void validate() {
if (errandId <= 0) {
throw new IllegalArgumentException("심부름 ID는 양수여야 합니다");
}
if (status.trim().isEmpty()) {
throw new IllegalArgumentException("상태는 비어있을 수 없습니다");
}
}
}
Producer && Consumer
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final MeterRegistry meterRegistry; // 모니터링을 위한 메트릭 등록
public void sendErrandStatus(ErrandStatusMessage message) {
message.validate(); // 메시지 유효성 검증
try {
// 메시지 전송 및 결과 처리
kafkaTemplate.send("errand-status-topic",
String.valueOf(message.errandId()), message)
.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Message sent successfully for errand id: {}",
message.errandId());
// 메트릭 기록
meterRegistry.counter("kafka.messages.sent.success").increment();
} else {
log.error("Failed to send message", ex);
meterRegistry.counter("kafka.messages.sent.failure").increment();
}
});
} catch (Exception e) {
log.error("Error sending message to Kafka", e);
meterRegistry.counter("kafka.messages.sent.error").increment();
throw new KafkaException("Failed to send message to Kafka", e);
}
}
}
이러한 구성을 통해:
- 메시지의 안정적인 전송과 수신
- 에러 상황에 대한 적절한 처리
- 메시지 유효성 검증
- 모니터링 및 메트릭 수집
이렇게 구성하고 있으며 다음 글에서 Test와 개선할 수 있는 사항 그리고 실행 및 도커 실행 등을 같이 포스팅 해보겠습니다.
'Java' 카테고리의 다른 글
RedisTemplate 의 객체지향 (1) | 2024.11.20 |
---|---|
Controller Restful 관련 (0) | 2024.11.11 |
JWT 그런데 OAuth2.0 곁들인 (2) (1) | 2024.11.10 |
JWT 그런데 OAuth2.0 곁들인 (1) | 2024.11.08 |
SSE(Server-Sent Events) (0) | 2024.11.06 |