이번에는 라인 유튜브에 올라와있는 것을 보고 정리한 글입니다.

 

틀린 내용이 있을 수도 있습니다. 틀린 내용이 있다면, 알려주시면 감사하겠습니다.

 

 

 

문제 상황

 

kafka broker를 rolling restart 했을 때,

message produce가 실패하는 현상 발생 했음.

-> produce reqeust timeout이 발생했다고 함.

 

restart 했던 broker의 response time은 정상이였지만, producer의 request latency가 거의 16초나 되었다고 함.

 

 

왜 이런 현상이 나타났을까?

그렇다면 request는 어떠한 과정으로 일어나는 지?

 

producer -> broker -> producer 이런 식으로 latency를 측정함.

broker는 데이터를 완전히 받는 직후부터 시간을 측정함.

 

 

broker를 restart 했을 때, node_netstat_TcpExt_SyncookiesSent가 spike였음(값이 높았음)

 

TCP SYN Cookies에 대해 알아야함.

이는 syn flloding을 막기 위해 이용하는 것임.

 

SYN flood attack 이란?

 

TCP 통신할 때, syn 만 보내고, SYN + ACK 을 무시함. 그러면 서버는 syn queue에 이 SYN을 보관하다가 다 차게 됨.

그러면 syn cookie를 이용하게 됨. 

 

syn cookie는 SYN 패킷에 있는 정보들을 이는 SYN + ACK의 ISN(Initial Sequence Number)로 만들어서 클라이언트에게 보냄.(원래는 random 값) 클라이언트가 서버에게 ACK을 보내면, 이 ACK의 정보를 decoding해서 connection을 맺게 됨.

 

 

 

 

그럼 왜 SYN flood가 일어났는지?

 

broker를 restart하면, 이 broker에 접속했던 client들이 일단 다른 broker에 접근을 하게 됨. restar가 되면, client는 broker들에게 fail-over를 하게 됨. 그리고 본래 broker들에게 기존 client들이 접속하게 됨.

연결하려는 client가 많아져서 syn flood 가 일어남.

 

 

tcp window가 문제가 됨?

요약하면, tcp window는 receiver가 처리할 수 있는 만큼의 양만 달라고 하는 것.

만약 데이터가 들어오는 속도가 처리 속도보다 빠르다면 데이터가 유실될 수 있음.

 

tcp 옵션에 window scaling 라는 옵션이 있음. 이는 windowsize * (2^scaling) 값이 최종 window size가 됨.

이를 이용하는 이유는 기존 window size의 최대값이 16비트 밖에 되지 않아서 그럼.

하지만 SYN cookie의   seq number는 32bit임. window scaling factor를 이용하지 못하게 됨.

-> 이는 한 번에 보낼 수 있는 데이터의 양이 적어지고 이는 throughput을 감소시킴.

 

근데 linux는 tcp timestamp가 유효하면, 이를 이용해서 window scaling 을 내장하는 기능이 있음.

그리고 scaling factor가 없어진다고 해도, timeout이 일어나는지?

 

 

실제로 net.ipv4.tcp_syncookies 값을 2로 설정하고 테스트해봤다고 함.

-> 모든 connection을 syncookies를 이용하여 연결함.

 

window scaling이 유효해야 하는 상태에서 지연이 일어나고 있었음?(잘 모르겠음)

-> broker 의 response time은 정상이지만, producer는 그렇지 않았음.

 

producer는 ack을 계속 기다리고 있었음. window size(789)가 작아서 이러한 현상이 일어났다고 함

 

window scaling 값도 1이였음. 즉 실제 window size는 789 * 2 ^ 1 = 1578이였음. 

 

 

그럼 왜 이렇게 window size가 작았을까?

커널을 분석해봤다고 함. 근데 서로 window size가 달랐음.

출처 : https://www.youtube.com/watch?v=_2F_qdwfUas&ab_channel=LINEDevelopers

broker의 window scaling factor가 7로 나옴(아까는 1로 나왔었음)

즉, producer는 1로 인지하고 있고, broker는 7로 알고 있음.(64배 차이남)

그래서 계속 기다리고 있었음.

 

실제 적용되는 값을 찾기 위해,

리눅스 커널의 tcp_select_window 함수로 window scaling 값을 조절함. 그래서 이를 hook 하는 방법을 생각함?

BPF를 이용하면 이 부분이 가능하다고 함. 유저가 작성한 프로그램이 커널에서 작동할 수 있다고 함.

 

실제 broker의 window scaling 값은 7이였음. 7을 이용하여 window size를 계산했음.

 

broker는 window scaling foactor를 7로 알고 있는 상태에서 데이터를 보냄., producer는 1로 알고 있는 상태임.

producer는 window size가 작았음. producer는 데이터를 보낼 때 마다 ack을 기다리고 있었음. 그렇기 때문에 producer reqeust 시간이 오래 걸리게 되고, 이는 timeout을 발생시킴.

 

왜 브로커 > producer 의 값과 실제 connection에 이용되는 값이 달랐을까?

-> 커널의 소스 코드에 문제가 있었다고 함.(linux kernel 3.10) 5.10 버전에 이 버그 해결되었다고 함.

 

 

해결책

 

이를 해결하기 위해, syn flood가 일어나지 않았다고 판단하도록 하게 했음.

즉, syn cookie를 끄는 방법.

-> 이는 syn flood가 일어나는 동안 syn 을 drop 하게 됨. 정상적인 접속이기 때문에 syn retry를 하게 됨. 이는 지연이 발생할 수 있음.

 

kafka의 listen backlog size를 늘리면 된다고 함. 하지만 이는 50으로 하드코딩 되어 있어서 늘릴 수 없다고 함.

이 부분을 논의 중임.

 

 

느낀 점

정말 대단하다고 느꼈습니다. 백엔드 개발자의 cs 지식이 정말 중요하다는 것을 느끼게 되었습니다.

 

궁금한 점이 하나 있습니다.

처음에는 단순히 window size가 작아서 발생한 문제라고 처음에는 이해했었습니다.

근데 broker window scaling factor가 1이면, window size의 문제는 아닌 것 같네요.

 

그렇다면 syn flood가 일어났고, 이 때문에 syn cookie를 이용해서 통신을 하게 되는데 커널 버그 때문에 서로 window size가 다르게 알고 있어서 이런 이슈가 발생한 것으로 이해했습니다.

 

그렇다면, 이 버그가 해결된 5.10 버전의 커널을 사용할 때는 syn cookie(syn flood가 일어나도)를 이용해도 이러한 문제점이 발생하지 않는 걸까요?

 

 

 

window size가 다르게 되면 어떠한 현상이 일어나는 지 알아볼 필요성이 있는 것 같습니다.

 

 

출처 : https://www.youtube.com/watch?v=_2F_qdwfUas&ab_channel=LINEDevelopers 

https://en.wikipedia.org/wiki/SYN_cookies

반응형

'BackEnd > Kafka' 카테고리의 다른 글

[Kafka] partition, consumer, producer 관계 (2)  (0) 2023.02.08
[Kafka] partition, consumer, producer 관계 (1)  (0) 2023.02.08

이번에는 두 대의 서버와 local kafka를 이용하여 실험해 본다.

 

kafka는 docker를 이용하여 실행한다.

 

producer

@Async
//항상 1 전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm",Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2 전송
public void produce2(String mm){
    kafkaTemplate.send("fcm", Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

 

Consumer

//Server1
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-1 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}


//Server2
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-2 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}

 

우선 parition이 1개인 kafka와 서버 한 대를 이용하여 테스트한다.

당연하게도  server1의 하나의 consumer가 전부 읽는다.

 

다음에는 서버 한 개를 추가한다. consumer 설정은 위의 코드와 동일하다.

 

server1

server1이다. 하나의 서버로 돌릴 때와 큰 차이가 없다.

 

 

server2

server2에서는 consume이 안되는 것을 확인할 수 있다. 그 이유는 같은 그룹에서는 하나의 partition에 하나의 consumer만 접근할 수 있기 때문이다. 그룹이 partition에 대한 offset을 관리하기 때문에 이러한 현상이 일어난다.

 

 

다음에는 이 상태에서 터미널을 이용하여 kafka에 partition을 하나 추가한다.

server1

 

server2

이처럼 rebalancing이 일어난 것을 확인할 수 있다. 직접 kafka에 partition을 추가했을  뿐인데, 알아서 처리한다. 맨 마지막 줄을 주목하자.

 

결과는?

server1
server2

위에서 보다시피 server1에서는 partition 0에 관해서만 처리하고, server2에서는 1에 대해서만 처리한다.

 

갑자기 궁금해서 partition을 3개까지 늘려봤다.

server1
server2

보면 server1의 하나의 consumer가 2개의 partition에 대해서 처리하도록 바뀐 것을 확인할 수 있다. 이 rebalancing 되는 원리가 궁금하다. 나중에 시간이 되면 오픈소스를 확인해봐야겠다.

 

이번엔 server2를 종료시켜봤다.

server1

rebalancing이 일어나고,

server1

server1의 하나의 consumer가 전부 이를 처리한다.

 

다음에는 server2를 다시 작동시키고 그룹은 다른 그룹으로 변경한다. 현재 offset 읽는 option을 latest로 설정해서, 새로운 메시지를 읽지 않는다.

 

그래서 earliest로 바꾼 후에, 또 다른 그룹으로 바꾸고 실행해보았다.

server2

이전에 넣은 메시지들을 전부 들고와서 처리하는 것을 확인할 수 있다. 그리고 모두 단일 thread로만 처리한다.

 

그리고 kafka에서 topic을 삭제했다. 그러면, consumer에서 계속 에러가 발생한다. offset을 읽을 수 없다고.

무한 루프도는 것처럼 계속 나와서 이에 대해서도 처리를 해줄 필요가 있을 것 같다.

 

그리고 producer를 이용해 메시지를 보내면, topic이 저절로 생긴다. 이는 kafka 내부 옵션으로 자동으로 생성되지 않도록할 수 있다.

 

 

앞에서는 concurrency설정을 1로 했을 경우이다.

 

이번에는 2로 설정해서 해본다.

 

서버 하나, parition 하나

partition이 하나이기 때문에, cocurrency를 2로 해도 차이가 없는 것을 확인할 수 있다.(하나의 partition에는 같은 그룹의 consumer 하나만)

 

partition을 하나 더 추가한다.

server1

server1에서 두 개의 consumer를 이용하여 처리한다. 또한 consumer1(threadId 55)는 partition 0만, consumer2는 partition1만 처리한다.

 

또 partition을 하나 더 추가한다. 총 3개의 partition이 있다.

server1

consumer는 2개만 작동하고, consumer1은 0,1을 처리하고 consumer2는 2를 처리하는 것을 확인할 수 있다.

 

 

다음에는 서버2의 concurrency는 1로 설정해서 실행해본다.

server1
server2

server2의 consumer에게 하나의 partition이 할당된 것을 확인할 수 있다.

 

server2의 concurrency를 2로 설정하고 다시 실행한다.

server1
server2

server1에서 consumer2가 놀게 되고, server2는 모두 작동하는 것을 확인할 수 있다.

여기서 server1의 consumer2가 자원을 얼마나 차지하는 지 궁금하다.  나중에 꼭 소스코드를 봐야겠다.

 

앞에서는 모두 producer에서 send할 때 key값을 랜덤하게 줬다. 그렇게 한 이유가 key 가 없이 하면, 다른 partition에 데이터가 들어가지 않아서 그렇게 했었다.

 

이에 대해서 조사해보니, producer의 partitioner기본 전략은 sticky 이다. sticky는 batch(byte)가 꽉 차면,  다른 partittion의 batch에 데이터를 쌓게 된다. 또한 linger.ms라는 것도 있는데, linger.ms를 넘어가면 자동으로 partition에 batch 데이터를 보낸다. linger.ms의 기본값은 0이고, batch는 조금 크다. 그래서 key없이 하면, 하나의 partition에만 데이터가 들어갔었다.

 

 

그래서 key부분을 지우고, produce1의 batch가 1이 되면 바로 전송하도록 수정하고 실행해봤다. 이는 하나의 서버에 두 개의 method를 만들어서 실행한다.

@Async
//항상 1전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm", mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2전송
public void produce2(String mm){
    kafkaTemplate2.send("fcm", mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

 

 

server1

결과를 보면, 2가 partition 0에는 들어가지 않는 것을 확인할 수 있다. 이는 produce2의 batch가 크기 때문에 partition 1에만 계속 2를 넣는다. partition 0에 2를 넣고 싶어도, batch가 차기 전에 보내버리기  때문에 다음 partition의 batch에 데이터를 넣을 수 없다.

 

하지만 1은 partition 0,1 두 곳 모두에서 존재하는 것을 확인할 수 있다.

 

 

 

 

 

* 혼자 블로그나 공식 문서를 보고 공부하는 것이기 때문에 틀리는 것이 있을 수도 있습니다.

반응형

parition, consumer, producer 가 정확히 어떻게 작동하는 지 궁금하여 테스트를 해보게 되었다.

 

Spring test와 @EmbeddedKafka를 이용하여 테스트 했다.

 

 

이는 producer이다. key는 random으로 아무런 값이나 보내게 된다.

@Async
//항상 1 전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm",Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2 전송
public void produce2(String mm){
    kafkaTemplate.send("fcm", Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

여기서 궁금한 점은, 비동기로 작동하는 producer 2개가 같은 parition에 접근하는 경우 데이터가 정상적으로 들어가는지이다.

 

이는 producer가 작동하는 과정이다.

for(int i = 0; i < 20; i++){
	if(i%2==0) producer.produce1("1");
    else 	   producer.produce("2");
 }

 

결과는 다음과 같다.

pro 1 139
pro 1 133
pro 2 138
pro 2 136
pro 1 135
pro 1 137
pro 2 134
pro 2 136
pro 1 135
pro 2 133
pro 1 138
pro 2 140
pro 2 133
pro 1 135
pro 2 136
pro 1 134
pro 1 139
pro 2 137
pro 2 140
pro 1 138

1 10번, 2 10번 올바르게 잘 보내고 있다. 

 

아래는 consumer 설정이다.

//Consumer1.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-1 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}


//Consumer2.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-2 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}

동일한 주제와 동일한 그룹으로 메시지를 consume한다.

그리고 메세지가 온 parition과 메시지 값, 그리고 메시지의 threadId(consumer)를 확인한다.

 

같은 partition에 2개의 consumer가 존재할 경우에는 다음과 같이 작동한다.

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127

보면, consumer 하나만 작동하여 메시지를 출력하는 것을 확인할 수 있다. 즉, Consumer2는 아예 작동을 하지 않는다.

 

partition이 2개로 늘어난 다면?

listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129

다음과 같이 consumer 2개 전부 잘 작동한느 것을 확인할 수 있다. 어느 partition 메시지를 전송할지는 producer의 key에 따라서 결정된다. 여러 partition이 섞여있는 것처럼 보인다.

 

하지만 여기서 핵심은 trehadId 127(Consumer1)은 parition 0에만 접근을 하고, threadId 129(Consumer2)는 partition1에만 접근하는 것을 확인할 수 있다. kafka 자체에서 자동으로 이렇게 할당을 해준다. 이를 rebalancing이라고 한다.

이 전략에 대해서는 따로 공부할 필요성이 있다.

 

2개의 parition에 하나의 consumer만 있다면 어떻게 될까?

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127

이처럼 하나의 consumer가 다 처리하게 된다.

 

 

다음에는 embedded kafka가 아닌 local에 kafka를 작동시킬 예정이다.

또한 하나의 서버가 아닌 두 대의 서버를 이용하여 실험을 해본다.

 

반응형

+ Recent posts