본문 바로가기
BackEnd/spring

[Spring] redis stream

by 하용권 2024. 12. 17.

최근 회사에서 로깅 로직 때문에 성능 저하가 있었습니다.

요청 -> 로그 DB 적재 형태로 되어 있다 보니 트래픽이 많이 발생하게 되면 connection pool , insert 성능 등 문제가 있었습니다.

 

그래서 로그를 DB에 바로 적재하는 방식이 아니라 중간에 로그를 모아서, bulk로 insert 하도록 로직을 수정하려고 했습니다.

이를 위해 redis stream을 이용했습니다.

 

 

1. redis stream이란?

redis에는 다양한 자료 구조가 있습니다.

이중에서 이벤트 소싱처럼 쓰기 적절한 자료구조로 list, pub/sub, stream 이 있습니다.

 

pub/sub의 경우에는 중복 없이 데이터를 동시에 처리할 수가 없습니다. 예를 들면, test_topic이라는 topic이 있고, 이를 여러 consumer들이 구독하고 있다고 가정해 보겠습니다. 이 경우에 서로 다른 consumer가 같은 데이터를 들고 갈 수 있습니다. 또한 subcriber가 없으면 이전에 들어온 데이터들은 그대로 버려지게 됩니다.

 

하지만 stream은 좀 다릅니다.

consumer group이라는 것이 있고, 이 안에 여러 consumer들이 있을 수 있습니다. 같은 consumer group 안의 consumer들은 중복된 데이터를 처리하지 않고, 서로 각자 다른 데이터를 가져옵니다. 이의 장점은 만약 데이터가 produce 되는 속도가 너무 빠르게 될 경우 consumer만 추가해서 처리를 할 수 있습니다.

또한 중간에 실패할 경우에는 pending 시켜서 이후에 다시 처리한다든가 retry 후 데이터를 버리는 등 다양한 전략을 선택할 수 있습니다.

 

그래서 redis stream을 이용했습니다.

 

2. redis stream 사용법 및 주의 사항

redis 는 인메모리 DB입니다. 그렇기 때문에 데이터를 너무 많이 적재할 경우 메모리에 부하가 발생할 수 있습니다. 또한 big key 문제가 발생할 수 있으므로 적절한 대안을 줘야 합니다.

 

다행히도 xadd라는 명령어를 통해서 해결할 수 있습니다.

 

XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);

RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, newValue), options);

 

만약 maxlen 보다 현재 stream에 데이터의 개수가 더 많다면, 오래된 것부터 지웁니다.

approxiateTrimming옵션 같은 경우에는 false로 하면 정확하게 데이터의 개수를 유지할 수 있습니다. 하지만 이는 비용이 비싸다고 합니다. 그 이유는 radix tree를 이용하고 있기 때문입니다.

 

대신 true로 설정을 하면 1000개 보다 약간 더 많은 데이터가 적재될 수는 있지만, 더 효율적이라고 합니다.

 

 

(만약 spring -redis의 버전이 2.7.x라면, streamOps.add(...) 에 XAddOptions를 인자로 주는 메서드가 존재하지 않을 수 있습니다. 이 경우에는 trim을 이용해서 데이터를 지우고 삽입하는 등의 방법이 필요합니다. 제가 알기로는 3.4부터는 해당 메서드를 지원해 주는 것으로 알고 있습니다.)

 

 

 

두 번째 문제점은 데이터를 가져가고 ack을 주지 않으면 해당 메시지는 pending 상태가 됩니다.

pending 된 데이터를 관리하기 위해서 PEL(Pending Entries List)에 해당 정보를 저장하고 있습니다.

ack 이 된다면, 해당 데이터를 관리할 필요가 없기 때문에 지웁니다.

(참고 : https://redis.io/docs/latest/commands/xpending/)

 

만약 pending 관리가 필요가 없다면, autoack을 통해서 데이터를 읽자마자 바로 ack 상태로 만들 수 있습니다.

 

3. 의문점

 

spring-redis docs에서 해당 내용이 있습니다.

 

StreamMessageListenerContainerOptions<String, ObjectRecord<String, Long>> containerOptions = StreamMessageListenerContainerOptions
	.builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).targetType(Long.class).build();

이는 읽을 때, 배치 사이즈 만큼 한 번에 데이터를 읽으라는 소리입니다. 만약 stream에 batch size 만큼의 데이터가 없다면, pollTimeout 동안 기다린 후에 데이터를 가져옵니다.

 

저는 이러한 옵션이 있어서, StreamListener에서 데이터를 리스트로 한 번에 들고 올 수 있을 줄 알았습니다.

실제 kafka listener의 경우에는 이러한 기능이 있습니다.

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

(참고 https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners)

 

 

하지만 StreamListener는 List로 한 번에 데이터를 들고 오지 못합니다.

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
        //here
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

 

이런 식으로 되어 있고, onMessage 함수를 통해서 메시지를 하나씩 처리합니다.

class StreamPollTask<K, V extends Record<K, ?>> implements Task {
....
    private void deserializeAndEmitRecords(List<ByteRecord> records) {
    
        for (ByteRecord raw : records) {
    
	    try {
    
                pollState.updateReadOffset(raw.getId().getValue());
                V record = convertRecord(raw);
                listener.onMessage(record);
	    } catch (RuntimeException ex) {
    
                if (cancelSubscriptionOnError.test(ex)) {
    
	            cancel();
	            errorHandler.handleError(ex);
            
	            return;
                }
            
                errorHandler.handleError(ex);
            }
        }
    }

}

그리고 이를 이용해서 데이터를 전달하게 됩니다.

간단하게 설명을 하면, List<ByteRecord>는 redis에서 읽어온 데이터입니다.

batch size 만큼 한 번에 데이터를 읽어오고, 이를 for 문을 통해서 lisener.onMessage(...)에 데이터를 하나씩 전달하게 됩니다.

 

이러한 로직이 doLoop() 라는 메서드 내에서 매번 실행이 되게 됩니다.

저는 왜 데이터를 한 번에 전달하지 않고 하나씩 전달하는지 궁금해서 github issue에 해당 내용을 질문했습니다.

https://github.com/spring-projects/spring-data-redis/issues/3078

 

 

 

StreamListener를 이용하면 제가 처음에 말했던 로깅 문제는 해결하기 어렵습니다.

 

redis에서 데이터를 한 번에 많이 가져온 후 이를 db에 bulk insert 하는 방식을 구현하려면 추가 구현이 필요합니다. (데이터를 클래스 필드에 저장 후에 일정 개수 이상이 되면 DB에 insert 하는 등)

 

또한 사용자가 이용하지 않는 시간대에는 로그가 발생하지 않을 수 있습니다. 이 경우에는 데이터를 계속 모으고 있다가 일정 시간이 지나면 DB에 insert 하는 로직이 필요합니다(StreamMessageListenerContainerOptions의 pollTimeOut처럼)

만약 stream에서 읽을 데이터가 존재하지 않는다면, for문을 타지 않기 때문에 onMessage(...) 메서드가 실행이 되지 않습니다. 즉, 몇 시간 전의 요청이더라도 DB에 실제로는 데이터가 적재되지 않을 수 있습니다.

 

 

그래서 redisTemplate을 이용해서 수동으로 데이터를 가져오도록 변경했습니다.

 

 

 

 

 

 

 

 

 

참고 자료 

https://techblog.lycorp.co.jp/ko/building-a-messaging-queuing-system-with-redis-streams

https://redis.io/docs/latest/develop/data-types/streams/

https://dev.gmarket.com/113

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners

반응형