본문 바로가기
BackEnd/Kafka

[Kafka] partition, consumer, producer 관계 (1)

by 하용권 2023. 2. 8.

parition, consumer, producer 가 정확히 어떻게 작동하는 지 궁금하여 테스트를 해보게 되었다.

 

Spring test와 @EmbeddedKafka를 이용하여 테스트 했다.

 

 

이는 producer이다. key는 random으로 아무런 값이나 보내게 된다.

@Async
//항상 1 전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm",Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2 전송
public void produce2(String mm){
    kafkaTemplate.send("fcm", Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

여기서 궁금한 점은, 비동기로 작동하는 producer 2개가 같은 parition에 접근하는 경우 데이터가 정상적으로 들어가는지이다.

 

이는 producer가 작동하는 과정이다.

for(int i = 0; i < 20; i++){
	if(i%2==0) producer.produce1("1");
    else 	   producer.produce("2");
 }

 

결과는 다음과 같다.

pro 1 139
pro 1 133
pro 2 138
pro 2 136
pro 1 135
pro 1 137
pro 2 134
pro 2 136
pro 1 135
pro 2 133
pro 1 138
pro 2 140
pro 2 133
pro 1 135
pro 2 136
pro 1 134
pro 1 139
pro 2 137
pro 2 140
pro 1 138

1 10번, 2 10번 올바르게 잘 보내고 있다. 

 

아래는 consumer 설정이다.

//Consumer1.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-1 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}


//Consumer2.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-2 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}

동일한 주제와 동일한 그룹으로 메시지를 consume한다.

그리고 메세지가 온 parition과 메시지 값, 그리고 메시지의 threadId(consumer)를 확인한다.

 

같은 partition에 2개의 consumer가 존재할 경우에는 다음과 같이 작동한다.

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127

보면, consumer 하나만 작동하여 메시지를 출력하는 것을 확인할 수 있다. 즉, Consumer2는 아예 작동을 하지 않는다.

 

partition이 2개로 늘어난 다면?

listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129

다음과 같이 consumer 2개 전부 잘 작동한느 것을 확인할 수 있다. 어느 partition 메시지를 전송할지는 producer의 key에 따라서 결정된다. 여러 partition이 섞여있는 것처럼 보인다.

 

하지만 여기서 핵심은 trehadId 127(Consumer1)은 parition 0에만 접근을 하고, threadId 129(Consumer2)는 partition1에만 접근하는 것을 확인할 수 있다. kafka 자체에서 자동으로 이렇게 할당을 해준다. 이를 rebalancing이라고 한다.

이 전략에 대해서는 따로 공부할 필요성이 있다.

 

2개의 parition에 하나의 consumer만 있다면 어떻게 될까?

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127

이처럼 하나의 consumer가 다 처리하게 된다.

 

 

다음에는 embedded kafka가 아닌 local에 kafka를 작동시킬 예정이다.

또한 하나의 서버가 아닌 두 대의 서버를 이용하여 실험을 해본다.

 

반응형