카프카 - config 동적 할당 (json 매핑)

반응형
반응형

프로듀서..

@Bean
   public <T> ProducerFactory<String, T> producerFactory() {
	     Map<String, Object> configProps = new HashMap<>();
	     // 중간 구현
       return new DefaultKafkaProducerFactory<>(configProps);
   }

    // 동적으로 만들고
    @Bean
   public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    // 구체화 시키고
    @Bean
    public KafkaTemplate<String, DemoCreateEvent> demoCreateEventKafkaTemplate() {
        return kafkaTemplate(producerFactory());
    }

이렇게 만들게 되면 구체화쪽만 복사해서 객체를 매핑 시킬 수 있다.
중간 구현에 있는 코드는 프로듀서의 속성을 넣으면 된다.

컨슈머...

    @Bean
    public <T> ConsumerFactory<String, T> kafkaConsumer() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      
        // 아래 두개는 등록하지 않아도 정상적으로 구동은 되어짐
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaConsumerFactory<>(consumerConfig);
    }
    // 이거 없으면 json으로 역직렬화가 안됨
    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    // container 등록
    @Bean
    public <T> ConcurrentKafkaListenerContainerFactory<String, T> consumerCommonFiled() {
        ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumer());
        return factory;
    }

    // 구체화
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, CreateOrderEvent> containerFactory() {
        return consumerCommonFiled();
    }

이거 또한 구체화쪽을 복하해서 각 객체에 할당 시켜주면 된다.

@Bean
public StringJsonMessageConverter jsonConverter() {
    return new StringJsonMessageConverter();
}

설정하다가 하두 안 돼서 확인해보니 요 객체를 추가를 해줘야한다는 사실을 알았다.
그리고 StringJsonMessageConverter을 이용했는데 다른걸 써도 되지 않을까 생각이 든다.
다른 클래스를 사용해도 똑같지 않을까? 물론 저거를 상속한 (? super T) 얘네들은 가능할듯..

처음에 

consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);

이걸로 역직렬화가 된다고 생각했지만, 이상하게도 이걸로는 역직렬화는 되지 않았다.
얘네 같은 경우에는 강제로 json으로 컨버터한다고 작성해두면 필요가 없는걸로 파악 되었다.
지금은 모르지만 추후에 얘네가 왜 필요한지 알 수 있을 있을지도 모르겠다.

그리고 주의해야 할점은
프로듀서와 컨슈머 둘다 
BOOTSTRAP_SERVERS_CONFIG를 등록 시켜줘야 한다.
아마 따로 동작이 되어지는 느낌이다. 아무튼 최대한 중복 코드를 지울려고 노력하였다.

프로듀서와 컨슈머 코드를 읽어보면 조금 이상한 점이 있는데
물론 큰 문제는 되지 않는다. 그 이상한 점이라는건 프로듀서는 factory를 받고 있지만 컨슈머쪽은 전혀 그렇지 않는다는 점이다. 
이 점이 제일 아쉽다. 하지만 내가 원하는건 동적 할당이기 때문에 이렇게 하였다.

마지막으로 내가 yml으로 작성하지않고 자바로만 작성한 이유는 이렇게도 충분히 가능하다는것을 확인했기 때문이다.
굳이 yml은 필요없을거 같다.
다만 위 부트스크랩 정보처럼 간단한 정보는 yml로 빼서 사용하는것이 어쩌면 더 효율적일지도 모르겠다는 생각이 들었다.
yml로 빼는 순간 한번만 입력해도 되는거 같기 때문이다.

 

반응형

'국비지원 (스파르타)' 카테고리의 다른 글

ACID vs BASE  (0) 2025.03.07
카프카는 도대체 어떻게 사용하는 걸까?  (2) 2025.03.07
스프링 시큐리티(인증)  (0) 2025.03.05
스프링 이벤트  (0) 2025.03.04
DDD와 샤딩(feat.gpt)  (0) 2025.02.28

댓글

Designed by JB FACTORY