카프카 - config 동적 할당 (json 매핑)
- 국비지원 (스파르타)
- 2025. 3. 11. 09:52
프로듀서..
@Bean
public <T> ProducerFactory<String, T> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// 중간 구현
return new DefaultKafkaProducerFactory<>(configProps);
}
// 동적으로 만들고
@Bean
public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
// 구체화 시키고
@Bean
public KafkaTemplate<String, DemoCreateEvent> demoCreateEventKafkaTemplate() {
return kafkaTemplate(producerFactory());
}
이렇게 만들게 되면 구체화쪽만 복사해서 객체를 매핑 시킬 수 있다.
중간 구현에 있는 코드는 프로듀서의 속성을 넣으면 된다.
컨슈머...
@Bean
public <T> ConsumerFactory<String, T> kafkaConsumer() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 아래 두개는 등록하지 않아도 정상적으로 구동은 되어짐
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaConsumerFactory<>(consumerConfig);
}
// 이거 없으면 json으로 역직렬화가 안됨
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
// container 등록
@Bean
public <T> ConcurrentKafkaListenerContainerFactory<String, T> consumerCommonFiled() {
ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumer());
return factory;
}
// 구체화
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CreateOrderEvent> containerFactory() {
return consumerCommonFiled();
}
이거 또한 구체화쪽을 복하해서 각 객체에 할당 시켜주면 된다.
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
설정하다가 하두 안 돼서 확인해보니 요 객체를 추가를 해줘야한다는 사실을 알았다.
그리고 StringJsonMessageConverter을 이용했는데 다른걸 써도 되지 않을까 생각이 든다.
다른 클래스를 사용해도 똑같지 않을까? 물론 저거를 상속한 (? super T) 얘네들은 가능할듯..
처음에
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
이걸로 역직렬화가 된다고 생각했지만, 이상하게도 이걸로는 역직렬화는 되지 않았다.
얘네 같은 경우에는 강제로 json으로 컨버터한다고 작성해두면 필요가 없는걸로 파악 되었다.
지금은 모르지만 추후에 얘네가 왜 필요한지 알 수 있을 있을지도 모르겠다.
그리고 주의해야 할점은
프로듀서와 컨슈머 둘다
BOOTSTRAP_SERVERS_CONFIG를 등록 시켜줘야 한다.
아마 따로 동작이 되어지는 느낌이다. 아무튼 최대한 중복 코드를 지울려고 노력하였다.
프로듀서와 컨슈머 코드를 읽어보면 조금 이상한 점이 있는데
물론 큰 문제는 되지 않는다. 그 이상한 점이라는건 프로듀서는 factory를 받고 있지만 컨슈머쪽은 전혀 그렇지 않는다는 점이다.
이 점이 제일 아쉽다. 하지만 내가 원하는건 동적 할당이기 때문에 이렇게 하였다.
마지막으로 내가 yml으로 작성하지않고 자바로만 작성한 이유는 이렇게도 충분히 가능하다는것을 확인했기 때문이다.
굳이 yml은 필요없을거 같다.
다만 위 부트스크랩 정보처럼 간단한 정보는 yml로 빼서 사용하는것이 어쩌면 더 효율적일지도 모르겠다는 생각이 들었다.
yml로 빼는 순간 한번만 입력해도 되는거 같기 때문이다.
'국비지원 (스파르타)' 카테고리의 다른 글
ACID vs BASE (0) | 2025.03.07 |
---|---|
카프카는 도대체 어떻게 사용하는 걸까? (2) | 2025.03.07 |
스프링 시큐리티(인증) (0) | 2025.03.05 |
스프링 이벤트 (0) | 2025.03.04 |
DDD와 샤딩(feat.gpt) (0) | 2025.02.28 |