카프카 of (프로듀서 and 컨슈머)
- 루퍼스/8주차
- 2025. 9. 5. 16:49
배경
카프카는 어떤 목적으로 사용이 되어질까? 왜 하필 카프카일까?
사실 반드시 카프카여야 된다는 이유는 없다고 생각한다. 카프카는 우리가 알기에는 메시지 큐정도만 알고 있는데
사실 단순한 메시지 큐는 아니다. 정확히 말하면 분산 메세지 스트리밍 플랫폼이다.
카프카에 대해서는 엄청나게 공부할것이 많지만 여기에서는 두 가지만 이야기를 할 예정이다.
프로듀서와컨슈머.. 프로듀서에서 메시지를 발송하고 컨슈머쪽에서 받는 그럼 시스템이다.구현하면서 생각보다 쉽지 않음을 느꼇다.
어떤 부분에서 고민했는지 생각해보자.
프로듀서
프로듀서는 다음과 같이 작성을 하였다.
public class LikeKafkaEventPublisher {
private final KafkaTemplate<Object, Object> kafkaTemplate;
private final MessageConverter converter;
private final static String AGGREGATE_TOPIC = "PRODUCT_LIKE_CHANGED_V1";
public void aggregate(Long productId, int data) {
log.info(" productId: {}, data: {}", productId, data);
String key = LocalDate.now().toEpochDay() + ":" + productId;
String message = converter.convert(new Message("METRICS", converter.convert(new LikeMetricsMessage(productId, data))));
kafkaTemplate.send(AGGREGATE_TOPIC, key, message);
}
어려웠던점은 3가지로 정리할 수 있다.
1. 토픽 이름 작성
2. 파티션 키에는 어떤 값이 작성되어야 하나
3. 컨슈머로 전송되는 '메시지'는 어떤것을 중점적으로 생각을 해야 하는가?
첫번째,
지금 하는 행위가 어떤 작업을 하는지 알야된다고 생각한다. 내가 생각할때 토픽은 프로듀서에서 컨슈머를 메시지를 보낼때, 거기에 맞는 제목이라고 생각이 든다. 그래서 무수히 많은 데이터를 보내는데 그것들에대한 기준을 작성하면 된다고 생각한다.
이건 마치 지금 내가 블로그글을 작성하는 행위와 똑같다고 생각한다. 블로그글이 메시지고 블로그 제목이 토픽이다. 즉, 토픽은 어떤 내용으로 전송이 되어질지 작성하면 된다고 생각한다.
나는 토픽 이름을 PRODUCT_LIKE_CHANGED_V1이라고 했다. 이를 톨해 상품의 좋아요의 변화 추이를 확인하는것을 알 수가 있다.
V1은 버전을 관리하기 위해 작성이 되었다.
두번째는
카프카는 메시지를 파티션이라는 공간으로 구분해서 전송할 수 있는데 그거에 대한 키를 작성한다고 생각하면 된다.
주의할점은 파티션을 작성할때
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data)
파티션을 직접 지정해서 전송을 할 수 있다. 하지만 저 부분은 카프카의 실 파티션을 뜻하는것이다. 결국, 운영쪽 데이터라고 생각하면 된다.
직접 지정해서 전송하는 행위는 운영쪽에 영향을 준다는 의미이기때문에 사용하는것을 권고하고 있다는걸로 알고 있다.
그러면 우리는 어디에다가 파티션을 지정할 수 있을까? 바로 키라는것을 통해지정할 수 있다. 여기서 키는 파티션 키를 뜻한다.
파티션 키는 파티션을 해쉬로 매핑하는 걸로 알고 있다. 주의할점은 절대로 int나 long을 키로 사용하게 되면 카프카를 통해 전송이 되지 않을 수 있다.(이점 때문에 데이터가 ui로 노출이 되지 않아 당황했다.)
아무튼 내가 생각할때 파티션 키는 어떤 부분들로 나눌수 있는지가 중요하다고 생각한다.
여기에서는 LocalDate.now().toEpochDay() + ":" + productId로 지정했다. 이렇게 지정한 이유는 오늘날짜와 상품ID로 지정했다. 이렇게 지정한 이유는 저 부분은 집계인데 일별로 지정하게 되어진다. 이걸 통해 집계가 1이 추가가 되어도 언제 어떤것의 증가가 되었는지 쉽게 알 수 있다고 생각한다.
마지막,
처음에 개발을 했을 때, 데이터만 발송하였다. 이 말이 무슨말이냐면,
kafkaTemplate.send(AGGREGATE_TOPIC, key, 1)
요런식으로 int나 long같은 타입데이터로만 발송하였다. 이렇게 하니 문제가 있었다. 에러가 발생하고 그런것은 아니다.
다만 저 데이터로 어떤 목적으로 발송을 하는지 불분명하다.
그래서 어떻게 하면 효과적으로 발송할 수 있을지 고민했다.
나는 다음과 같이 작성을 하였다.
public record Message(
String eventId, // 멱등성 체크
MessageType type, // 타입 체크
String payload, // 페이로드
Long eventTime // 언제 발송하였는지
)
이벤트 ID는 멱등성을 위해 작성한 키로 만약에 메시지가 두번 발송이 되어지면 이 값은 동일하기 때문에 이 값을 통해 처리할 수 있다.
그 다음은 타입인데 이건 이 메시지가 어떤 타입인지 정리를 해놓았다. 이 프로젝트에서는 총 3가지 방법을 제공하였다.
METRICS, EVICT, LOG
집계, 캐시 해제, 로그 이렇게 3개를 작성하였다.
이걸 구분하기위해 작성하였다. 그리고 페이로드인데 어떤 정보로 발송을 할지 정의를 해놓았다. 내가 생각할때 메시지는 컨슈머가 이 메시지를 보고 판단할 근거가 있어야 한다고 생각한다.
컨슈머
컨슈머는 다음과 같이 작성하였다.
@KafkaListener(
topics = {
"${aggregate-kafka.like.topic-name}",
},
groupId = "${aggregate-kafka.group-id}",
containerFactory = KafkaConfig.BATCH_LISTENER
)
public void aggregateLikesListener(
List<ConsumerRecord<String, String>> messages,
Acknowledgment acknowledgment
) {
for (ConsumerRecord<String, String> message : messages) {
factory.getStrategy(MetricsMethod.LIKES).process(message.value());
}
acknowledgment.acknowledge();
}
생각보다 컨슈머는 프로듀서에서 메시지를 전달하고 받는과정에서 깊게 고민한것은 딱히 없었던거 같다.
아무래도 메시지만 신경을 쓰면 되기 때문이라 생각이 든다.
또, 아직 구현하거나 작업할것이 남아있는것도 크다. 지금은 간단하게 구현만 해놓은 상태다.
acknowledgment.acknowledge();
이 부분을 통해, 커밋이 진행이 되어진다.
At Least Once/ At Most Once
이거에 대해 고민해보자.
이들을 하기 위해서는 ask와 idempotence의 설정을 해야하지만 지금은 하지않은 상태다. 하지 않는 이유는 위 3가지에 대해서 각자 어떻게 처리할지가 조금씩 달라질 수 있기 때문에 사용하지는 않았다. 어떤 특징이 있고 어떻게 개발 할 수 있을지 생각해보자.
At Least Once
최소한번 가능하라는 뜻이다. 최소 한번이라는건 여러번이 들어와도 괜찮다는 뜻이다. 소실과 중복에 대해 고민해보면 이는 데이터가 소실은 허용하지 않는다는 뜻이된다. 왜냐하면 소실이 된다는건 최소 한번이 아니라는 뜻이 된다. 일단 데이터는 들어왔으니 데이터가 중복이 되어도 괜찮다고 생각이 든다. 그러면 집계, 캐시, 로그중에서 어떤 정보가 여기에 해당이 될까?
내가 생각할때, 집계는 어찌되었든 처리가 되어야 한다고 생각한다.
데이터 하나가 추가되었는데 그 내용이 중복처리되어지는게 사용자 입장에서는 괜찮다고 생각이 든다.
하지만 좋아요는 차감이 존재한다. 그렇기 때문에 마이너스가 되지 않게 멱등성을 체크해야 한다고 생각한다.
(나머지는 단순히 증가되는 형태이기 때문에 상관없다고 생각한다.)
그러면 어떻게 할 수 있을까?
프로듀서 입장에서 생각해보자.
프로듀서는 메시지의 유실을 생각해야하는데 이를 체크할 수 있는 부분이
acks라고 한다. 0,1,all이 있는데 모든 내용을 설명하면 At Least Once에 해당되지 않을 수 있기 때문에 넘어가자.
여기에는 ask=1과 ask=all을 설정할 수 있다고 한다.
하지만 1은 리더가 저장된 시점에서 성공으로 간주 → 팔로워 복제 전에 리더 죽으면 일부 유실 가능, 하지만 재전송하면 At Least Once 달성.이라고 한다. 결국 재전송으로 처리할 수 있다면 1 재전송을 할필요가 없다면 all이라고 생각한다.
하지만, 재전송을 해도 소실을 100%없앨 수 있는것은 아니라고 생각한다. 그렇기 때문에 ask는 all이라고 생각한다.
이제 컨슈머 입장에서 생각해보자.
컨슈머는 데이터가 들어오고 이를 처리한다음에 커밋을 해야 한다고 한다.
커밋은 오프셋 확인을 뜻하며, 이제 이 글은 이제 읽었다는 뜻이 된거라 생각한다.
@Bean
public KafkaTemplate<Object, Object> kafkaAtLeastTemplate(KafkaProperties kafkaProperties) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 덮어쓰기
ProducerFactory<Object, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(producerFactory);
}
요렇게 하면 덮어쓰기가 된다고 한다.
그럼 이 내용을 가지고 한번 적용해보자.
UI를 확인해보면
생각보다 잘 처리다..
찾아보니 랙이 남아있어도 At Least는 지연이 있어도 크게 상관없다고 생각한다. 나중에 처리되면 되지 않나 생각이 든다.
At Most Once
이건 최대 한번이라는 뜻이다. 최대 한번은 언젠가 성공한다는 마음가짐으로 진행이 된다고 생각한다.
위 예시로 들면, 로그가 여기에 해당이 된다고 생각한다.
로그는 유실은 되어도 중복은 되면 안된다고 생각한다 중복이 되는 순간 어떤 로그의 데이터인지 모르기때문이라 생각이 든다.
프로듀서는 유실이 되어도 되기때문에 ask를 0으로 설정한다.
- 프로듀서는 전송만 하고 브로커 응답을 기다리지 않음.
- 성능은 빠르지만 메시지 유실 가능성 큼.
이라고 한다. 그렇다면 컨슈머는 어떨까?
@KafkaListener(
topics = "${logging-kafka.like.topic-name}",
groupId = "${logging-kafka.group-id}",
containerFactory = KafkaConfig.BATCH_LISTENER
)
public void logListener(
List<ConsumerRecord<String, String>> messages,
Acknowledgment acknowledgment
) {
acknowledgment.acknowledge();
for (ConsumerRecord<String, String> message : messages) {
service.insert(message.value());
}
}
}
처리가 완료되기때문에 카프카 입장에서는 완료라 생각한다 그리고 나서 진행이 되기 때문에 굉장히 빠를거라 예상한다.
마무리
카프카의 프로듀서와 컨슈머에 대해 학습을 하였다. 다만 프로듀서와 컨슈머는 카프카의 빙산의 일각에 불과하다.
그렇다고 해서 프로듀서와 컨슈머에대해 모든 내용을 학습하지는 못했다. 글을 읽어보면 알 수 있듯이 한 쪽에 치우쳐있는 듯한 느낌을 많이 받았다. 이 글에는 데이터가 유실이 되면 어떻게 처리가 되었는지에 작성이 되어있지는 않았다. 그리고 멱등 체크도 하지 않았다. At 시리즈중에서 정확히 한번은 설명하지 않았다. 그 이유는 정확히 한번해야 되는 경우는 생각보다 얼마 없다고 생각했기 때문이다.
최소 한번, 최대 한번으로도 충분히 해결할 수 있지 않을까 생각이 든다.