[스터디] 레디스 스트림

반응형
반응형

오랜만에 플젝으로 돌아왔다. 이번에는 레디스 스트림을 넣어서 동작을 시킬예정이다.
아마 내일부터는 면접준비랑 스터디를 병행하면서 할거 같다.
쉴만큼 쉬었으니 괜찮겠지..
아무튼 오늘 튜터님의 도움으로 레디스 스트림을 이용할 수 있게 되었다.
내가 관가했던 부분은 config쪽을 완료하지 못했었다.
오늘 같은 경우는 스트림만 만들예정이다.
근데 생각해보면 레디스 스트림은 뭘까?? 대기열이라고 생각하면 된다. 대규모 트래픽에서 대기열이라고 할 수 있는건 카프카가 있다.
그러면 왜 카프카가 아니라 레디스 스트림을 선택을 했나?
일단 내 도메인의 특성을 이해를 해야하는데 내 도메인은 외부로 메시지를 보내는 시스템이 아니다. 동일한 도메인에서 대기열 큐를 만드는 시스템이다. 그렇다는건 외부에 메시지를 보내는 카프카는 다소 무거울 수 있다는 뜻이 된다. 이건 내가 카프카와 레디스 스트림 중 선택한 이유고 뭔가 다른 이유가 있을까?

Kafka는 분산 메시징 시스템으로 다른 시스템과의 메시지 통신에 강점을 가지지만,우리 시스템은 내부 모듈 간 비동기 처리가 주된 목적이기 때문에, 오버헤비한 Kafka 대신 Redis Stream으로 충분합니다.
Kafka는 시스템 간 분산 메시지 브로커로서 외부 서비스 연동에 탁월한 성능을 보여주지만, 저희 시스템은 전적으로 내부 서비스 간 이벤트 전달이 목적이기 때문에, 메시지 브로커로 Redis Stream을 선택했습니다. 이는 Redis를 통한 빠른 통신, 가벼운 구성, 낮은 운영 부담 측면에서도 유리한 선택이었습니다.

그리고 아래는 그에 대한 표다.

통신 범위 외부 시스템이 아닌 내부 구성 요소 간 이벤트 전달
인프라 부담 Kafka는 브로커, 토픽, ZooKeeper 등 운영 복잡도 ↑
속도 Redis는 메모리 기반이므로 응답 속도 빠름
일관성 Redis Stream은 ack 기반 처리로 기본적인 신뢰성 제공
도입 비용 Kafka를 쓰기 위한 DevOps, 모니터링, 운영 부하 대비 Redis는 가볍게 구성 가능

그리고 이 밑은 카프카와 레디스 스트림을 gpt에 물어본 결과다.

                 항목                                            Kafka                                                  Redis Stream

기본 철학 대규모 분산 로그 기반 브로커 메모리 기반 경량 메시지 큐
설치 및 구성 복잡 (브로커, ZooKeeper, 설정 필요) 간단 (Redis만 있으면 됨)
운영 난이도 높음 낮음
메시지 처리량 매우 높음 (수십만 msg/sec 이상) 중간 정도 (단일 Redis 인스턴스 기준)
메시지 보존 디스크 기반, 장기 저장 가능 메모리 기반, trimming 필요
컨슈머 모델 파티션 기반, offset 관리 필요 consumer group 기반, ack 방식
내부 시스템 통신 과함 (오버엔지니어링 가능성) 적합 (내부 비동기 전달에 최적)
외부 시스템 통합 매우 적합 한정적
재처리 / 중복 방지 가능 (offset rewind) 가능 (ack 미처리 시 pending 상태)
Spring 통합 풍부 (Spring Cloud Stream 등) 기본 제공 (StreamListenerContainer 등)

그럼에도 불구하고 언제든지 카프카로 변경 가능하게 설계를 해야 한다.
혹시 모르니까.. 메시지 전달을 외부로 할지도 모르기 때문이다.
언제든 바꿀수 있게... 

암튼 내가 왜 카프카가 아닌 레디스 스트림을 선택한 이유에 대해 설명을 했으니 본격적으로 레디스 스트림을 조져보자.

일단 레디스 스트림도 카프카와 마찬가지로 config정보가 필요하다. 그러니까 레디스 스트림도 프로듀서와 컨슈머 개념이 존재한다.
실제로는 뭐라고 부르는지는 모르겠지만 아무튼 비슷한 개념은 존재하는걸로 알고 있다.

일단 config를 만들어보자.

@Configuration
public class RedisConfig {

  @Bean
  public <T> RedisTemplate<String, T> redisTemplate(RedisConnectionFactory factory) {
    RedisTemplate<String, T> template = new RedisTemplate<>();
    template.setConnectionFactory(factory);

    StringRedisSerializer serializer = new StringRedisSerializer();

    template.setKeySerializer(serializer);
    template.setValueSerializer(serializer);

    template.setHashKeySerializer(serializer);
    template.setHashValueSerializer(serializer);

    return template;
  }

}

이게 프로듀서

@Bean
  public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
      RedisConnectionFactory factory) {

    var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
        .builder()
        .pollTimeout(Duration.ofSeconds(1)) // Redis polling 간격
        .build();

    var container = StreamMessageListenerContainer.create(factory, options);

    String streamKey = "mystream";
    String group = "mygroup";

    container.receive(
        Consumer.from(group, UUID.randomUUID().toString()),
        StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
        myStreamListener
    );

    container.start(); // 꼭 호출해야 함

    return container;
  }
}

이게 컨슈머 
고민을 해야 되는 부분이

 MapRecord<String, String, String>

요건데 이거는 무엇을 뜻하는 걸까?

개념                                      설명

MapRecord Redis Stream에서 들어온 한 개의 메시지를 표현하는 객체
K Stream key (String)
HK 필드 key (String)
HV 필드 value (String)
사용 이유 메시지 ID, key, value를 한 객체로 다루기 위함

그러니까 스트림 key 하위에 
H가 Map형태로 들어가게 되어진다.
이걸 카프카로 비교하면

Kafka RecordRedis                                                                                  Stream MapRecord

Topic name Stream key
Offset / Partition RecordId
Message payload (JSON) Map<String, String>

근데 뭐랄까 Map으로 저장하는게 좀 그렇긴한데.. 레디스 스트림은 Map형식으로 들어가는 거 같다.
그럼 이제 마지막으로 

@Bean
public void consumerGroup(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) {
  container.receive(
      Consumer.from("mygroup", UUID.randomUUID().toString()),
      StreamOffset.create("mystream", ReadOffset.lastConsumed()),
      myStreamListener
  );
}

이거만 분석하면 될거 같다.
컨슈머를 만드는 부분이라고 한다.
일단 이 부분은 조금더 생각을 해야 할거 같구..
일단 gpt가 알려준 config정보를 사용해보자.

@Bean
  public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(
      StringRedisTemplate redisTemplate,
      RedisConnectionFactory factory) {

    String streamKey = "stream:gathering_join";
    String group = "gathering_group";
    String consumerName = "gathering_consumer_1";

    // Group 생성 로직
    StreamOperations<String, Object, Object> streamOps = redisTemplate.opsForStream();

    if (!Boolean.TRUE.equals(redisTemplate.hasKey(streamKey))) {
      streamOps.add(streamKey, Map.of("init", "init"));
    }

    try {
      streamOps.createGroup(streamKey, ReadOffset.latest(), group);
    } catch (RedisSystemException e) {
      if (!(e.getCause() instanceof RedisBusyException)) {
        throw e;
      }
    }


    var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
        .builder()
        .pollTimeout(Duration.ofSeconds(1)) // Redis polling 간격
        .build();

    var container = StreamMessageListenerContainer.create(factory, options);

    // 여러가지 consumer가 있을 수 있다.
    //서버 3대
    container.receive(
        Consumer.from(group, consumerName),
        StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
        myStreamListener
    );
    container.start(); // 꼭 호출해야 함

    return container;
  }

}

이건데.. 뭔가 동적으로 할당을 못시키는 느낌이다. 물론 스트림을 사용하는 부분은 이거 밖에 없으니 큰 상관은 없을 거 같다.
이제 실행해보자.

정상적으로 데이터가 들어오는것을 확인했구

redisTemplate.opsForStream().add("stream:gathering_join", member.toMap());

요렇게 stream:gathering_join으로 데이터를 보낸다.
그럼 컨슈머 config에 할당된 정보를 읽고 

@Override
public void onMessage(MapRecord<String, String, String> message) {
  String userId = message.getValue().get("member_name");
  String groupId = message.getValue().get("type");
  System.out.println("✅ Consumed: " + userId + " joined group " + groupId);
}

요렇게 사용이 되어진다고 한다. 근데 생각해보면
요기에 있을 필요는 없다. 애초에 메소드가 하나라 함수형으로 짤수 있다.
그렇다는건 같은 클래스에서 사용이 가능하다는 뜻이된다.

그러니까 결론은 프로듀서는 템플릿을 사용하지만 컨슈머는 그렇지 않고
프로듀서는 조합으로 사용되어지지만 컨슈머는 상속을 이용한다라는 점을 기억해야 할거 같다.

요렇게 레디스 스트림은 마무리짓고 추후 이걸 동적으로 할 수 있는 방안과
MAP이 아니라 JSON은 불가능한지 생각을 해보자.

 

반응형

댓글

Designed by JB FACTORY