Kafka 난항기

2024. 11. 15. 15:59TroublesShooting

Kafka를 현 프로젝트에 추가하고 여러가지를 시도해보자 하는 마음이 결국 에러가 발생했다.
기쁘지 않은 마음으로 트러블슈팅을 작성해서 기록하자..
어렵다 어려워

코드를 작성하다보면 자주 만나는 이 녀석

***************************
APPLICATION FAILED TO START
***************************

Description:

A component required a bean named 'KafkaListenerContainerFactory' that could not be found.


Action:

Consider defining a bean named 'KafkaListenerContainerFactory' in your configuration.


Process finished with exit code 1

비교적 친절하게 알려준다
"지금 'KafkaListenerContainerFactory'Bean이 제대로 등록되지 않았잖아 ?"
난 제대로 한거 같은데 얘는 늘 옳고 난 늘 틀리는거니깐 고쳐보자


@Configuration
public class KafkaConfig {

    @Bean
    public ObjectMapper objectMapper() {
        return new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 에러 처리 및 재시도 정책 설정
        factory.setCommonErrorHandler(new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate()),
                new FixedBackOff(1000L, 3L)  // 1초 간격으로 3번 재시도
        ));

        // 배치 리스너 설정
        factory.setBatchListener(true);

        return factory;
    }

굉장히 쉽게 해결 할 수 있다.

@Bean(name = "KafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, ErrandStatusMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // 에러 처리 및 재시도 정책 설정
        factory.setCommonErrorHandler(new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate()),
                new FixedBackOff(1000L, 3L)  // 1초 간격으로 3번 재시도
        ));

        // 배치 리스너 설정
        factory.setBatchListener(true);

        return factory;
    }

@Bean(name = "KafkaListenerContainerFactory")

고작 @Bean에 이렇게 name = 으로 명시해주는 것 만으로도 해결이 되었다.
기분 나쁘다 솔직히 이거 안해줘도 된건데 갑자기 왜? 지만 아쉬운건 나니깐 적어주고 남겨둔다.

두 번째 에러

  1. Kafka 연결 실패
    WARN org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-absurdity-group-1, groupId=absurdity-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Node may not be available.
    
Bean을 처리하니 Kafka에 연결을 못하고있는 상황

2. 설정 불일치

ConsumerConfig values:
bootstrap.servers = [localhost:29092]


application.yml 파일에서 port 불일치가 발생한다 ?
bootstrap-servers: localhost:29092  # 29092 포트로 변경

docker-compose.yml 의 설정과 다르다고 한다.

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092

````
PLAINTEXT://kafka:9092: Docker 네트워크 내부 통신용
PLAINTEXT_HOST://localhost:29092: 외부(호스트 머신)에서의 접근용

이러한 설정은:

Docker 컨테이너 간 통신은 9092 포트 사용
호스트 머신(우리 애플리케이션)에서의 접근은 29092 포트 사용

따라서:

Spring Boot 애플리케이션은 localhost:29092로 접속해야 함
Docker 네트워크 내의 다른 서비스들은 kafka:9092로 접속

이번에 DOCKER?

failed to solve: rpc error: code = Unknown desc = failed to solve with frontend dockerfile.v0: failed to read dockerfile: open /var/lib/docker/tmp/buildkit-mount3573631474/Dockerfile: no such file or directory

DockerFile을 찾을 수 없다고 한다.
알고보니 DockerFile XX -> Dockerfile 이 맞다

물론 여전히 안되고 있다.
솔직히 가장 중요한 문제였다..

현재 상황은 Docker 컨테이너가 올라가지 않던 문제는 해결되었는데
여전히 29092 포트를 접속하지 못해 무한 콜이 나오는 상황이다.

마저 해결하고 다음 글에 올리겠습니다.