최근에 회사에서 푸시 시스템 관련 일을 하던 중에, RDS의 connection pool이 모자란 현상이 발생했습니다.
kafka를 이용하는 시스템에서 prodcuer 2대 consumer 2대를 이용하니, 3대까지 연결은 되는데 마지막 한 대가 연결이 안되는 에러가 발생했습니다. 이는 rds의 connection pool을 넘어서 그렇습니다. 그래서 producer의 connection pool을 4개로 줄여서 해결했습니다.
진짜로 해결이 된 것일까요?
사실 connection pool을 감소시킨 producer보다는 consumer부분이 더 신경쓰였습니다. 왜냐하면 consumer가 batch 단위로 데이터를 받아오고 각각의 데이터 검사를 위해 thread를 이용합니다. 즉, batch 크기 만큼 thread가 생기고 만약 db를 조회하면 connection이 일어나게 되니 connection pool이 부족하여 deadlock이 발생할 수 있다고 생각했습니다.
그래서 한 번 connection pool을 1로 설정하고 실험해봤습니다.
Take out 1 messages라는 로그만 남고, 멈춰버렸습니다. 그러더니 나중에 에러가 발생합니다.
@Override
public NotificationSettings findNotificationSettingsByDevice(String device) {
Long notificationId = jpaFCMDeviceRepository.findNotificationIdByDevice(device).orElseThrow();
return jpaNotificationRepository.findNotificationById(notificationId).orElseThrow(() -> new NullPointerException("Not Exist NotificationSettings for device"));
}
이 부분을 이용하는데, 보면 쿼리가 2개라서 커넥션이 2개가 필요한 줄 알았습니다.
그래서 2개로 설정하고 다시 해봤습니다.
push도 성공했다고 이번엔 잘 됩니다. 물론 지금은 데이터가 하나 밖에 없기 때문에 그렇습니다. 만약 데이터가 많아 진다면, thread 개수가 증가하여 여전히 deadlock이 걸립니다.
connection이 끝나자마자 반환하면, pool이 1개여도 deadlock이 발생하지 않을 것이라고 생각했습니다. 위 코드에서는 현재 그렇게 하지 못하고 있습니다.
직관적으로 이 method를 호출하는 상위 method의 @Transacntional 때문일 것이라고 생각하고 없애고 connection pool 1개로 실행해봤습니다. 결과는 아까와 달리 푸시에 성공했다고 뜹니다.
왜 이런 지 궁금해서 알아봤습니다.
제 전체 코드입니다.
@Override
@Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
log.info(String.format("Take Out %d messages.", messages.size()));
int messageCount = messages.size();
Thread[] threads = new Thread[messageCount];
//thread 지우면, connection pool 하나만 이용.
for(int i = 0; i < messages.size(); i++){
Message message = messages.get(i);
threads[i] = new Thread(() -> {
if(isValidMessage(message)) {
messenger.deliverMessage(message);
}
});
threads[i].start();
}
for(int i = 0; i< messageCount; i++){
try {
threads[i].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
여기서 isValidMessage가
public boolean isDestroy(Message message) {
NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
NotificationType messageNotificationType = message.getNotificationType();
return !deviceNotification.isNotification(messageNotificationType);
}
@Override
public NotificationSettings findNotificationSettingsByDevice(String device) {
Long notificationId = jpaFCMDeviceRepository.findNotificationIdByDevice(device).orElseThrow();
return jpaNotificationRepository.findNotificationById(notificationId).orElseThrow(() -> new NullPointerException("Not Exist NotificationSettings for device"));
}
isDestroy를 호출합니다.
Transactional이 있는 상위 method에서 쓰레드를 만들고 isDestroy를 호출하게 됩니다. 여기서 문제가 발생합니다.
그래서 어디가 문제인지 살펴봤습니다.
우선 단순한 반복문으로 바꿔서 테스트해봤습니다.
connection pool이 1개여도 정상적으로 실행됩니다.
검사는 for문으로, 메시지를 전달하는 부분은 multi thread로 했습니다. 이 부분은 db조회가 없습니다.
@Transactional
public void distributeMessages(List<Message> messages){
log.info(String.format("Take Out %d messages.", messages.size()));
int messageCount = messages.size();
List<Message> validMessages = new ArrayList<>();
//thread 지우면, connection pool 하나만 이용.
for(int i = 0; i < messages.size(); i++){
Message message = messages.get(i);
if(isValidMessage(message)) {
validMessages.add(message);
}
else{
messageCount-=1;
}
}
Thread[] threads = new Thread[messageCount];
for(int i = 0; i< messageCount; i++){
Message message = validMessages.get(i);
threads[i] = new Thread(() -> {
messenger.deliverMessage(message);
});
threads[i].start();
}
for(int i = 0; i< messageCount; i++){
try {
threads[i].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
이 코드는 deadlock 없이 정상적으로 작동합니다.
즉, multi thread를 할 때, thread 안에서 db를 조회하면 해당 문제가 발생하는 것을 발견했습니다.
그래서 @Transactional의 범위를 알아봤습니다.
알아보니, single thread에서만 적용이 된다고 합니다.
그래서 상위 method의 transactional을 빼고, 하위 method에 설정했습니다.
@Override
@Transactional // 추가
public boolean isDestroy(Message message) {
NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
NotificationType messageNotificationType = message.getNotificationType();
Account account = jpaAccountRepository.findByEmail("tt");
account.setEmail("ttt");
jpaAccountRepository.save(account);
if(true)
throw new RuntimeException();
return !deviceNotification.isNotification(messageNotificationType);
}
@Override
// @Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
log.info(String.format("Take Out %d messages.", messages.size()));
int messageCount = messages.size();
Thread[] threads = new Thread[messageCount];
for(int i = 0; i < messages.size(); i++){
Message message = messages.get(i);
threads[i] = new Thread(() -> {
if(isValidMessage(message)) {
messenger.deliverMessage(message);
}
});
threads[i].start();
}
for(int i = 0; i< messageCount; i++){
try {
threads[i].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println(jpaAccountRepository.findByEmail("ttt"));
System.out.println(jpaAccountRepository.findByEmail("tt"));
}
이렇게 하면
tt가 ttt로 바뀌지 않는 것을 확인할 수 있습니다. rollback이 되었습니다.
반대로 아까처럼 상위 method에만 @Transactional을 하면 적용이 되지 않는 것을 알 수 있습니다.
기존 코드 isDestroy는 transaction이 아니고, 별개로 작동한 것입니다. isDestroy에서 db를 조회하기 위해 connection이 한 개 필요하고, 상위 method가 Transaction이라서 connection이 한개가 필요합니다. 그래서 총 2개가 필요했었습니다.
(알아보니, 쿼리당 connection이 한 개가 필요한 것이 아니라, transaction을 실행할 때 connection이 일어나고 끝나면 반환합니다.)
Trasaction때문에 이미 connection 을 하나 이용하고 있는 상태에서 thread가 생성되고 여기서 connection을 기다리게 됩니다. 하지만 상위 method는 join으로 현재 thread가 끝날 때까지 기다리게 되고, connection을 계속 가지고 있게 되어 deadlock 이 발생합니다.
결론
이를 해결하기 위해서는
@Override
@Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
log.info(String.format("Take Out %d messages.", messages.size()));
int messageCount = messages.size();
List<Message> validMessages = new ArrayList<>();
for(int i = 0; i < messages.size(); i++){
Message message = messages.get(i);
if(isValidMessage(message)) {
validMessages.add(message);
}
else{
messageCount-=1;
}
}
Thread[] threads = new Thread[messageCount];
for(int i = 0; i< messageCount; i++){
Message message = validMessages.get(i);
threads[i] = new Thread(() -> {
messenger.deliverMessage(message);
});
threads[i].start();
}
for(int i = 0; i< messageCount; i++){
try {
threads[i].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
========================================================================================
@Override
//@Transactional(readOnly = true) 부모 메소드에 있어서 없어도 됨.
public boolean isDestroy(Message message) {
NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
NotificationType messageNotificationType = message.getNotificationType();
return !deviceNotification.isNotification(messageNotificationType);
}
for문을 이용하여 검사하면 됩니다. 하지만 속도 측면에서 아쉽습니다.
@Override
// @Transactional(readOnly = true) 필요 없음.
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
log.info(String.format("Take Out %d messages.", messages.size()));
int messageCount = messages.size();
Thread[] threads = new Thread[messageCount];
//thread 지우면, connection pool 하나만 이용.
for(int i = 0; i < messages.size(); i++){
Message message = messages.get(i);
threads[i] = new Thread(() -> {
if(isValidMessage(message)) {
messenger.deliverMessage(message);
}
});
threads[i].start();
}
for(int i = 0; i< messageCount; i++){
try {
threads[i].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
========================================================================================
@Override
@Transactional // 필요
public boolean isDestroy(Message message) {
NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
NotificationType messageNotificationType = message.getNotificationType();
return !deviceNotification.isNotification(messageNotificationType);
}
이는 하위 method에 transaction을 달아주면 됩니다. 그러면 1개 만으로도 충분히 deadlock없이 가능합니다. 다른 thread들이 connection을 사용하고 있는 thread가 종료될 때 까지 기다려야 하기 때문에, connection pool이 충분하지 않으면 속도가 느립니다.
'BackEnd > spring' 카테고리의 다른 글
[Spring] AOP 테스트 (2) | 2023.12.21 |
---|---|
[Spring] 빈 주입 null pointer exception 발생 (0) | 2023.08.22 |
[인프런] 스프링 핵심 원리 고급편(김영한) 정리 (0) | 2023.07.30 |
[인프런] 스프링 핵심원리 기본편(김영한) 정리 (0) | 2023.07.28 |
[Spring] XSS 방지 (0) | 2023.05.26 |