이번에 일을 하면서 개발 환경 구축을 하게 되었습니다.

 

보통 대부분의 프로젝트가 db를 이용할 것 입니다.

하지만 로컬에서 테스트를 해보기 위해 stage나 live 서버의 데이터를 조작하면 그 db로 테스트하고 있던 사람은 불편하겠죠.

 

그렇다고 새로운 프로젝트에 인원이 투입될 때마다, 로컬에 맞는 db를 만들면 시간이 매우 낭비됩니다.

 

이를 해결하기 위해서 DB 세팅(table, user, function, index 등)을 db에 올리고 테스트할 수 있도록 합니다.

 

보통 docker-compose를 이용하여 합니다.

version: '3.1'

services:
	db:
    	images:mysql
        ...
        ...
        volumes: ./test /docker-entrypoint-initdb.d/
        ...

하지만 db 관련 정보는 volume을 이용하기 때문에 이 컨테이너를 commit 해도 db 세팅했던 것들은 유지되지 않습니다.

 

물론 로컬에 마운트를 하면 백업 본이 생기지만, 다른 곳에서는 이미지를 가져와서 실행하면 백업 본도 못 봅니다.

 

 

이를 해결하기 위해서 직접 Dockerfile을 수정했습니다.

이는 github에서 손쉽게 구할 수 있습니다.

 

Dockerfile을 보면,

volume이 설정되어 있는데, 이를 지워주면 됩니다. 

그리고 아래를 보면 COPY ... 이 있는데 이 파일도 가져와야 합니다(git에 Dockerfile이랑 같이 있습니다).

 

ENV POSTGRES_USER <user name>

ENV POSTGRES_PASSWORD <password>

COPY <sql path> /docker-entrypoint-initdb.d/

그리고 이렇게 더 넣어줘야 합니다.

만약 db 세팅이 필요가 없다면 COPY 부분은 안 넣어도 됩니다.

 

아니면, /bin/bash에 들어가서 직접 table을 만드는 등 세팅을 해주시고

docker commit을 하면 됩니다.

 

 

이렇게 하면 이미지를 배포할 경우 다른 사용자들이 db를 그대로 활용할 수 있습니다.

데이터(row)도 유지할 수 있습니다.

 

 

 

이상하게도 저는 

이 부분에서 계속 general error가 발생했다고 합니다. 

 

링크를

keyserver.ubuntu.com

로 수정하니까 잘 되었습니다.

 

저도 그 이유는 잘 모르겠네요. 여기서 삽질을 엄청한 것 같습니다.

 

반응형

최근 개발할 때, 직접 postman으로 테스트 하는 것보다는 테스트 코드를 작성해서 돌려봅니다.

 

간단하게 테스트해볼 것이 있어서 통합테스트를 했었는데, 계속 null pointer exception이 발생했었습니다.

 

이 부분이 문제되는 코드 입니다.

 

@Service
public class SimpleService {

    public Integer add(Integer a, Integer b) {
        return a+ b;
    }
}



@RestController
@RequiredArgsConstructor
public class SimpleController {
    private SimpleService simpleService;

    @RequestMapping("/")
    public Map<String, Integer> add(Integer a, Integer b) {
        Map<String, Integer> response = new HashMap<>();
        response.put("ret", simpleService.add(a,b));

        return response;
    }

}

이 코드를 가지고 아래와 같이 테스트를 해보겠습니다.

@SpringBootTest
@AutoConfigureMockMvc
class DemoApplicationTests {


    @Autowired
    private MockMvc mockMvc;

    @Test
    void contextLoads() throws Exception {
        MultiValueMap<String, String> req = new LinkedMultiValueMap<>();
        req.add("a", "1");
        req.add("b", "2");
        mockMvc.perform(MockMvcRequestBuilders.get("/").params(req))
                .andExpect(MockMvcResultMatchers.jsonPath("$.ret").value(3));
    }

}

이 때, null pointer exception이 발생합니다.

SimpleService.add(...)에서 발생한 것을 알 수 있습니다. 이 부분에 bean이 주입이 되지 않아서 이러한 에러가 발생합니다.

 

 

이를 해결하려면 매우 간단합니다.

@Service
public class SimpleService {

    public Integer add(Integer a, Integer b) {
        return a+ b;
    }
}



@RestController
@RequiredArgsConstructor
public class SimpleController {
    //private SimpleService simpleService;
    
    private final SimpleService simpleService; //버그 수정

    @RequestMapping("/")
    public Map<String, Integer> add(Integer a, Integer b) {
        Map<String, Integer> response = new HashMap<>();
        response.put("ret", simpleService.add(a,b));

        return response;
    }

}

이렇게 final만 붙이면 됩니다.

 

왜 이러한 에러가 발생했을까요?

lombok의 @RequiredArgsConstructor 때문입니다.

 

이 코드의 docs를 보면,

이런 식으로 final fields가 필요하다는 것을 알 수 있습니다. 아니면 @NonNull 같은 null을 허용하지 않도록 하면 됩니다.

 

잘 되네요!

 

 

 

반응형

코로나 격리 기간 동안 기본편, 고급편을 다 보는 것이 목표였는데 성공했네요.

 

이제 복습만 해보면 될 것 같습니다(해당 포스트는 제가 복습하기 위해 정리해 둔 것 입니다.)

 

이번 강의는 조금 어려웠습니다. 그리고 AOP를 실제로 많이 쓰는 지 의문이 들었습니다.

AOP와 프록시 객체가 중점인데 이를 실제 프로젝트에 적용해보고 싶네요.

 

https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%ED%95%B5%EC%8B%AC-%EC%9B%90%EB%A6%AC-%EA%B3%A0%EA%B8%89%ED%8E%B8/dashboard

 

로그 추적기를 만들 때, 이 강의에서는 전부 콘솔을 통해 눈으로 직접 확인함.

assertThat 같은 junit의 기능을 활용하지 않음.

(학습을 위해 그렇게 하신다고 함.)

 

그렇다면, 로그는 어떻게 테스트를 해야하나?

→ 질문을 찾아보니, 보통 로그는 테스트를 하지 않음. 하지만 너무 중요하다면 따로 테스트를 해야한다고 답변해주심.

→ 어떻게….

 

로그를 남기기 위해 로그 관련된 객체를 파라미터로 넘겨야 됨.

→ 모든 코드가 이렇게 수정되어야 함.

 

이를 수정하기 위해서 새로운 클래스를 만들고, field에 TraceId(로그 관련 객체)를 추가해줌.

그리고 필드의 TraceId의 level을 직접 수정함으로써 가능함.

→ 문제는 이 새로운 클래스가 싱글톤일 경우, 동시성 문제가 발생함.

정상적인 경우

동시성 문제 발생(새로고침 눌러서 실험)

level 값이 2에서 끝나지 않고, 깊어진 것을 볼 수 있음. 그리고 transactionId가 같음.

객체 하나를 가지고 모든 사용자(요청)을 처리하니 이러한 현상이 발생함. 당연하게도.

싱글톤을 사용하면서 동시성을 해결하려면? → 쓰레드 로컬 사용하면 됨.

 

Thread local

해당 쓰레드만 접근할 수 있는 저장소.

(단점도 있을 거 같은데?)

주의점은 쓰레드가 종료할 때, .remove() 메소드를 통해서 객체를 지워줘야함. (메모리 누수가 일어날 수 있음)

 

만약 제거 하지 않다면?

WAS 같은 thread pool에서 심각한 문제 발생.

thread pool에 thread를 반환하게 되면, 기존에 사용하던 데이터가 Thread local에 남아있음.

다른 사용자에게 해당 thread가 할당되게 되면, 이전 사용자가 이용한 데이터 정보를 조회할 수 있음.

→ 그래서 사용자의 요청이 끝날 때, ThreadLocal의 값을 .remove()로 제거해야함.

 

 

템플릿 메서드 패턴

TraceStatus status = null;
try {
	status = trace.begin("message");
	//핵심 기능 호출
	trace.end(status);
} catch (Exception e) {
	trace.exception(status, e);
	throw e;
}

변하는 것과 변하지 않는 것을 분리해야함.

abstract class로 만들어서 상속받게 만들면, 매번 새로운 클래스 파일을 만들어야 함.

→ 익명 내부 클래스를 만들면 됨.

@Test
void templateMethodV1() {
    AbstractTemplate template1 = new SubClassLogic1();
    template1.execute();

    AbstractTemplate template2 = new SubClassLogic2();
    template2.execute();
}

-> 수정

@Test
void templateMethodV2() {
    AbstractTemplate template1 = new AbstractTemplate(){

        @Override
        protected void call() {
            log.info("비즈니스 로직1 실행");
        }
    };

    template1.execute();

    AbstractTemplate template2 = new AbstractTemplate(){

        @Override
        protected void call() {
            log.info("비즈니스 로직2 실행");
        }
    };

    template2.execute();
}

대신 클래스 이름 뒤에 $1 같은 문자가 붙음.

 

이 템플릿 패턴을 사용하는 모든 코드는 이러한 작업을 해줘야함. 과연 이게 좋은 건가…

코드가 좀 지저분해 보임(개인적으로는)

물론 유지보수하기는 좋음

만약 로그가 변경이 되면, 로그 템플릿 클래스만 변경하면 되기 때문임.

→ SRP를 지킴. → SRP가 잘 지켜져 있다면, 변경이 쉬움.

 

템플릿 메서드 패턴은 골격을 정해두고, 일부 메소드는 하위 클래스가 구현함.

→ 상속을 이용함.

 

현재 자식 클래스 입장에서는 부모 클래스 기능을 사용안하고 있음.

(독립적인 call을 각자 구현하고 있음. 부모 클래스의 기능은 사용하지 않고 있음.)

사용하지 않음에도 부모 클래스에 강하게 의존하게 됨. 좋은 설계는 아니라고 함.

→ 이를 해결하기 위해 전략 패턴을 이용하면 됨.

 

 

전략 패턴(strategy)

변하는 부분은 따로 인터페이스(strategy)로 만들고, 변경되지 않는 부분은 클래스(context)로 만들어서 해결.

즉, 인터페이스에 역할을 위임함.

@Slf4j
public class ContextV1 {

		//call()이 있음. 이를 구현해야함. 이는 인터페이스임.
    private Strategy strategy;

    public ContextV1(Strategy strategy) {
        this.strategy = strategy;
    }

    public void execute() {
        long startTime = System.currentTimeMillis();

        strategy.call();

        long endTime = System.currentTimeMillis();
        long resultTime = endTime - startTime;
        log.info("resultTime={}", resultTime);
    }
}

Context는 Strategy 인터페이스에만 의지.

코드보면 그냥 스프링에서 의존성 주입하는 것과 같음.

전략 패턴은 인터페이스에만 의존하고 있음. 템플릿 메소드 패턴은 부모 클래스의 변경에 민감했었음.

 

익명 클래스를 람다로 변경할 수 있음.

(인터페이스의 메소드가 1개라면)

 

이 패턴은 선 조립, 후 실행 방식에서 유용.

(context와 strategy 조립 후, 실행함.)

단점은 조립 후에 전략을 변경하기가 어려움.

setter로 변경해도 되지만, 만약 context가 싱글톤이라면 동시성 문제가 발생하게 됨.

public class ContextV2 {

    public void execute(Strategy strategy) {
        long startTime = System.currentTimeMillis();

        strategy.call();

        long endTime = System.currentTimeMillis();
        long resultTime = endTime - startTime;
        log.info("resultTime={}", resultTime);
    }
}

파라미터로 받아오면 됨. 멋진데?

원하는 전략을 줄 수 있음. 단점은 실행할 때마다 전략을 줘야 함.

 

 

템플릿 콜백 패턴

콜백은 다른 함수로 실행 가능한 코드를 넘겨주고, 호출한 메소드에서 이러한 코드를 실행함.

저 위에 있는 contextv2가 템플릿 콜백임.

ContextV2가 템플릿 역할이고, strategy가 콜백임.

(ContextV1은 아님)

이는 GOF 패턴이 아닌, 스프링에서 주로 이용하는 패턴임.

 

이러한 코드의 문제점은 콜백으로 넘겨주는 코드를 다 작성을 해야함.

→ 즉 원본 코드의 수정이 필요함. 이를 해결하기 위해 프록시 패턴 이용.


인터페이스 컨트롤러에는 @RequestParam 같은 어노테이션을 달아줘야 함. 없으면 가끔 인식 안된다고 함.

@SpringBootApplication에서 @Import를 이용하여 원하는 설정파일 직접 주입할 수 있음.

(BasePackages 범위 밖에 있어도 등록가능)

@Controller를 쓰면 자동으로 Component 스캔 대상이 되어버림.

→ 수동 빈 등록을 하고 싶다면 RequestMapping 사용.

 

프록시

클라이언트 → 프록시 → 서버

중간에서 간접적으로 프록시가 요청을 할 수 있음.

클라이언트가 기대한 것 외에 다른 부가기능을 수행할 수 있음.

클라이언트는 프록시를 통해 요청한 결과만 받으면 됨. 어떠한 부가 기능이 수행 되었는 지는 몰라도 됨.

 

주요 기능

접근 제어.

  • 권한에 따른 접근 차단
  • 캐싱
  • 지연 로딩

부가 기능 추가

  • 부가 기능 수행(로그를 남기는 등)

 

프록시 패턴은 접근 제어가 목적.

데코레이터 패턴은 새로운 기능 추가가 목적

e.g)

프록시 캐시처럼 활용하기?

interface Subject {
	T call();
}

class target implments Subject{
	Server server;
	T call() {
		server....()
	}
}

class proxy implents Subject {
	Subject target;
	String cacheValue;
	T call() {
		if(cacheValue == null) {
			cacheValue = target.call()
		}

		return cacheValue
	}
}

이런 식으로도 활용할 수 있음.

 

디자인 패턴에서 중요한 것은 의도.

데코레이터와 프록시 구분 기준은 의도임.

그리고 레포지토리나 서비스 등 인터페이스로 이용하면, 위처럼 프록시 패턴을 도입할 수 있음.

 

프록시가 이 인터페이스(컨트롤러, 서비스 등)를 상속받고, 이 프록시 내부에 진짜 객체를 이용하여 사용자의 요청을 처리할 수 있음.

→ 이런 식으로 하면 기존의 코드 변경이 사라짐.

 

그 이유는 기존 구체 객체를 빈으로 주는 게 아닌, 프록시를 주면 됨.

그러면 프록시 객체에서 로그를 남긴 후, 실제 객체에게 요청을 처리해달라고 하면 됨. 매우 멋진 방법처럼 보임.

 

구체 클래스를 상속해서 해도 됨.

하지만 super()로 부모 클래스의 생성자를 호출해야함. 인자로 null로 줘도 됨.

→ 왜냐하면 부모 클래스의 기능을 사용하지 않기 때문에.

(실제 객체를 필드로 가지고 있고, 그 객체를 통해 기능을 수행하기 때문에 부모 클래스의 기능을 사용하는 것이 아님.)

 

그래서 보통 인터페이스 기반이 더 좋음. 상속의 제약에서 자유로움.

인터페이스의 단점은 해당 인터페이스가 필요하다는 것이 단점임. 테스팅할 때도 단점이 있다고 함. 하지만 변경이 거의 일어나지 않는다면 구체 클래스를 하는 것이 더 좋을 수도 있음. 또한 실무에서는 둘 다 섞인 경우도 있기 때문에 우선 둘 다 알아둬야 함.

 

프록시의 단점은 프록시 클래스를 너무 많이 만들어야 함.

만약 적용할 클래스가 많아지면, 프록시 클래스도 그 만큼 만들어줘야 함.

→ 동적 프록시가 이를 해결해줌.


동적 프록시

 

리플렉션을 이용하여 메소드를 동적으로 변경가능.

리플렉션으로 method 따로 분리할 수 있음.

그리고 이를 프록시 객체에 method 를 넘겨주면 됨.

 

method.invoke(target, args) 로 실행할 수 있음. target은 해당 메소드를 실행할 인스턴스. args는 넘길 인수.

InvocationHandler의 구현 객체도 필요함. 거기에 invoke를 오버라이딩 해야함

 

그리고 jdk 리플렉션에서 지원해주는 proxy가 있음. Proxy.newProxyInstance를 통해 동적인 프록시 객체를 만들어줌. 하지만 이는 인터페이스에 적용 가능.

 

하지만 리플렉션은 런타임에 작동하기 때문에, 컴파일 시점에 오류 못 잡음. 되도록 사용하면 안됨.

private static final String[] PATTERNS = {"request*", "order*", "save*"};

@Bean
public OrderServiceV1 orderServiceV1(LogTrace logTrace) {
	OrderServiceV1 orderService = new OrderServiceV1Impl(orderRepositoryV1(logTrace));

	OrderServiceV1 proxy = (OrderServiceV1)Proxy.newProxyInstance(
		OrderServiceV1.class.getClassLoader(),
		new Class[]{OrderServiceV1.class},
		new LogTraceFilterHandler(orderService, logTrace, PATTERNS)
	);

	return proxy;
}

proxy가 빈으로 등록이 됨.

 

만약 인터페이스가 아닌 클래스에 적용하고 싶다면?

CGLIB(code generatpr library) 사용

 

이를 사용하는 경우는 거의 없음. 스프링에서 proxy factory라는 것을 제공해주기 때문.

이 안의 MethodInterceptor를 구현하면 됨.

 

그리고 Enhancer에 앞에서 구현한 것과 프록시를 적용할 객체를 넘겨주면 됨.

enhancer.create() 하면 프록시를 적용할 객체를 반환해줌.

하나는 클래스만, 하나는 인터페이스만 지원을 해줌.

→ 스프링에서 제공해주는 프록시 팩토리는 둘 다 이용가능.


스프링의 프록시 팩토리

프록시 팩토리는 Advice를 만들어서 처리를 하게 됨. CGLIB나 JDK 동적 프록시는 이러한 Advice를 호출하게 됨.

→ 즉, JDK, CGLIB 안에 코드를 따로따로 구현할 필요가 없음.

 

프록시 팩토리도 MethodInterceptor를 구현하면 됨(CGLIB와 다른 거임! 패키지 주의)

 

invociation.proceed() 하면 됨.

이전과 달리 target에 대한 정보가 안보임.

@Slf4j
public class TimeAdvice implements MethodInterceptor {
	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
		log.info("TimeProxy 실행");
		long startTime = System.currentTimeMillis();
		...
	}

이는 MethodInvocation invocation 내에 이미 들어있음.

이는 JDK가 기본임. 인터페이스가 없다면 CGLIB를 통해 동적프록시를 생성함.

이런 식으로 사용 가능

ServiceInterface target = new ServiceImpl();
ProxyFactory proxyFactory = new ProxyFactory(target);
proxyFactory.addAdvice(new TimeAdvice());

ServiceInterface proxy = (ServiceInterface) proxyFactory.getProxy();

매우 간단해짐.

 

포인트컷(Pointcut)은 필터링 로직. 클래스와 메서드 이름으로 필터링 가능.

어드바이스(Advice)는 앞에서 본 것처럼 프록시가 호출하는 부가 기능.

어드바이저(Advisor)는 둘 다 가지고 있는 것.

여러 프록시를 사용하려면?

//clients -> proxy2 -> proxy1 -> server

DefaultPointcutAdvisor advisor2 = new DefaultPointcutAdvisor(Pointcut.TRUE, new Advice2());
DefaultPointcutAdvisor advisor1 = new DefaultPointcutAdvisor(Pointcut.TRUE, new Advice1());

ServiceInterface target = new ServiceImpl();
ProxyFactory proxyFactory1 = new ProxyFactory(target);

proxyFactory1.addAdvisor(advisor2);
proxyFactory1.addAdvisor(advisor1);

이는 프록시가 여러 개 생성되는 것이 아님!

proxy가 여러 advisor를 사용하는 것임. 하나의 프록시에 어려 어드바이저를 적용되는 것.

등록 순서도 중요함

하지만 프록시 패턴에도 문제가 있음.

Configuration에서 bean을 만들때 중복되는 코드가 많음. 모든 클래스의 프록시 설정 코드를 다 작성해줘야 함.

컴포넌트 스캔을 하면 프록시 적용이 불가능함.

이미 기존 객체가 빈으로 바로 주입이 되어버림. 이 대신 프록시를 주입 해주도록 설정 해야하는데….

이를 처리하기 위해 빈 후처리기 이용하면 됨.


빈 후처리기(BeanPostProcessor)

빈을 등록하기 전에 빈을 조작할 수 있음.

후 처리 작업 시에 객체를 다른 객체로 바꿔치기도 할 수 있음.

 

BeanPostProcessor 인터페이스를 구현해야함.

 

before 메소드는 @PostConstruct 발생하기 전.

after는 초기화되고 난 후.

 

자바 8 이후는 default 로 선언되어 있어서 override를 안해도 오류가 안 뜸.

@Slf4j
public class PackageLogTraceProxyPostProcessor implements BeanPostProcessor {
	private final String basePackage;
	private final Advisor advisor;

	public PackageLogTraceProxyPostProcessor(String basePackage, Advisor advisor) {
		this.basePackage = basePackage;
		this.advisor = advisor;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws	BeansException {
		log.info("param beanName={} bean={}", beanName, bean.getClass());

		//프록시 적용 대상 여부 체크
		//프록시 적용 대상이 아니면 원본을 그대로 반환
		String packageName = bean.getClass().getPackageName();
	
		if (!packageName.startsWith(basePackage)) {
			return bean;
		}
	
		//프록시 대상이면 프록시를 만들어서 반환
		ProxyFactory proxyFactory = new ProxyFactory(bean);
		proxyFactory.addAdvisor(advisor);
		Object proxy = proxyFactory.getProxy();
	
		log.info("create proxy: target={} proxy={}", bean.getClass(),
		
		proxy.getClass());
		return proxy;
	}
}

AnnotationAwareAspectJAutoProxyCreator라는 빈 후처리기가 자동으로 등록이 됨?

(자동 프록시 생성기)

 

모든 Advisor 빈을 조회함. 이에 있는 포인트컷을 이용하여 해당 객체에 프록시를 적용할 지 결정함.

해당 객체의 모든 메서드를 포인트컷에 매칭해봄. 하나라도 만족하면, 프록시 대상이 됨.

위처럼 따로 PostProccesor를 구현안해도 됨. 그냥 Advisor만 빈으로 등록하면 됨.

→ 스프링이 알아서 프록시로 해줌.

 

근데 여러 advisor가 있다면 순서는 어떻게 적용이 되는지? → @Order로 선언을 해주면 되나.(이는 강의에는 없음. 뒤쪽에 나옴)

 

스프링이 자체로 내부에서 사용하는 bean에도 해당 패턴이 있으면 로그를 남기는 문제가 있음.

(orderController 같은 빈을 등록할 때도 로그가 남음. pattern에서 order라는 메소드가 들어가있으면, 전부 로그를 남기도록 했기 때문.)

 

AspectJExpressionPoincut으로 이를 해결가능함.

실무에서 이를 주로 이용한다고 함.

정규표현식으로 처리 가능.

(자세한 건 AOP 부분에서 설명 나오는 듯 함.)

 

만약 여러 개의 advisor를 등록 시, 하나의 adivsor의 포인트컷만 만족해도 프록시로 생성됨. 대신 프록시에 만족하는 advisor만 포함.

(그렇다면 조건마다 여러 프록시 객체가 만들어지는 것인가? 아니면 하나의 프록시에서 매번 포인트컷으로 필터링을 하는지)


Aspect 프록시

@Aspect 어노테이션으로 편리하기 프록시 생성가능.

@Aspect
public class LogTraceAspect {
	
	...

	//pointcut
	@Around("execution(*hello.proxy.app..*(..))")
	public Object method(ProceedingJoinPoint joinPoint) throws Throwable {
		//joinPoint에 정보가 들어있음.(메소드)
		//Advice 로직 작성하면 됨.
	}

	...
}

앞에서 본 AnnotationAwareAspectAutoProxyGenerator는 @Aspect의 Around부분을 Advisor로 변환하여 저장해줌.

순서 정하려면 @Order 사용하면 됨


@AOP(Aspect Oriented Programming)

애플리케이션은 핵심기능(비즈니스 로직?), 부가기능(트랜잭션, 로그 추적 등)으로 구분할 수 있음.

중복되는 부가 기능을 여러 클래스에 적용하려면?

→ 모든 클래스에 적용하기 위해 코드를 수정해주거나 유틸리티 클래스를 만들어서 적용시킴.

→ 이는 유지보수 하기 힘들고 구조가 복잡해짐.

이를 해결하기 위해 AOP가 나옴.

 

횡단 관심사를 깔끔하게 모듈화할 수 있음.

  • 오류 검사 및 처리
  • 동기화
  • 성능 최적화(캐싱)
  • 모니터링 및 로깅

어떤 방식으로 로직을 추가할 수 있나?

  • 컴파일 시점(AspectJ 사용)
    • 컴파일된 시점에 원본 로직에 부가 기능 로직이 추가됨. 위빙이라고 함.
    • 단점은 특별한 컴파일러가 필요하고 복잡함.
  • 클래스 로딩 시점(AspectJ 사용)
    • 자바는 JVM에 내부의 클래스 로더에 .class 파일을 저장함.
    • 이 시점에 aspect를 적용하는 것이 로드 타임 위빙임
    • 로더 조작기를 지정해야 하는데, 번거롭고 운영하기 어려움.
  • 런타임 시점(프록시, 지금까지 했던 것.)
    • DI, 프록시, 빈 포스트 프로세서 같은 것들을 동원해서 프록시를 통해 스프링 빈에 부가 기능을 추가함.
    • 프록시를 사용하기 때문에 일부 제약이 있음. 메서드 호출에만 적용할 수가 있음.(생성자에는 안됨?)
    • 실제 대상 코드는 그대로 유지가 됨. 항상 프록시를 통해서 조작함.
    • 스프링 빈에만 적용이 가능함.

AspectJ는 사용하기 어려움.

스프링 AOP는 편리함. 그리고 실무에서는 이 기능만 이용해도 대부분의 문제를 해결할 수 있음.

 

AOP 용어

JoinPoint

  • 어드바이스가 적용될수 있는 위치, 메소드 실행, 생성자 호출, 필드 값 접근, static 메서드 접근 같은 프로그램 실행 중 지점.
  • AOP를 적용할 수 있는 모든 지점.(앞에서는 생성자는 안된다고 하지 않았나)
  • 스프링 AOP는 프록시 방식 사용하기 때문에, 메소드 실행 지점으로 제한됨.

Pointcut

  • 어드바이스가 적용될 위치 선별.
  • 스프링 AOP는 메서드 실행 지점에만 설정가능.

Target

  • 어드바이스를 받는 객체. 포인트컷으로 결정.

Advice

  • 부가 기능

Aspect

  • 어드바이스 + 포인트컷을 모듈화 한 것.
  • @Aspect
  • 여러 어드바이스와 포인트 컷 존재.

Advisor

  • 하나의 어드바이스와 하나의 포인트 컷.
  • 스프링 AOP에서만 사용하는 용어

Weaving

  • 포인트 컷으로 결정한 타겟의 조인 포인트에 어드바이스 적용하는 것.
  • 핵심 기능에 영향 주지 않고 부가 기능 추가함.

AOP proxy

  • AOP를 적용하기 위한 프록시
@Pointcut("execution(* hello.aop.order..*(..))") //pointcut expression
private void allOrder(){} //pointcut signature

@Around("allOrder()")
public Object doLog(ProceedingJoinPoint joinPoint) throws Throwable {
	log.info("[log] {}", joinPoint.getSignature());
	return joinPoint.proceed();
}
//hello.aop.order 패키지와 하위 패키지
@Pointcut("execution(* hello.aop.order..*(..))")
public void allOrder(){}

//클래스 이름 패턴이 *Service
@Pointcut("execution(* *..*Service.*(..))")
private void allService(){}

//hello.aop.order 패키지와 하위 패키지 이면서 클래스 이름 패턴이 *Service
@Around("allOrder() && allService()")
...

//pointcut을 다른 클래스에서 만들었다면?
@Around("hello.aop.order.aop.Pointcuts.allOrder()")
...

드디어 순서 관련해서 나옴.

하나의 애스펙트에 여러 어드바이스가 있으면 순서를 보장받을 수 없음.

(method에 @Order을 해도 순서가 보장이 안됨)

 

그래서 클래스 단위로 쪼개야 함.

 

pointcut excecution 지시자(주로 사용됨)

"execution(public String hello.aop.member.MemberServiceImpl.hello(String))"

매칭 조건

  • 접근제어자?: public
  • 반환타입: String
  • 선언타입?: hello.aop.member.MemberServiceImpl
  • 메서드이름: hello
  • 파라미터: (String)
  • 예외?: 생략

. 은 해당 위치의 패키지

..은 그 하위 패키지도 포함.

 

파라미터에서 (..) 은 파라미터 타입과 수가 상관없다는 뜻. *은 무슨 타입이든 상관 없다는 뜻.

그리고 부모 타입으로 설정하면 자식 타입도 다 적용 가능 함.

(단, 부모 타입에 있는 메서드만 적용됨.)

 

within은 타입만 지정. 타입 내의 모든 메소드에 적용.

주의점은 타입이 정확하게 맞아야 함(부모 타입 안됨)

“within(hello.aop.member.MemberServiceImpl)”

 

target은 부모 클래스의 메서드까지 적용 됨.(자식 클래스가 아님. 부모임)

 

args은 인자가 주어진 타입의 인스턴스인 조인 포인트로 매칭.

(execution의 인자 부분. execution은 인자의 타입이 정확히 매칭되어야 함. args는 부모 타입 허용함.)

args는 단독 사용 보다는 보통 파라미터 바인딩에서 주로 사용됨.

“args(String)”

 

@target, args, @args는 혼자 사용이 안됨.

(이유는 잘 이해가 안감. 추후 복습 필요.)

 

@annotation은 해당 어노테이션이 있으면 적용함.

“@Around(”@annotation(hello.aop.member.annotation.MedthodAop”)

 

bean 이름으로도 적용가능

@Around("bean(orderService) || bean(*Repository)")

 

이런 식으로 인자로 뭔가를 받을 수 있음?

@Around("allMember() && args(arg,..)")
public Object logArgs2(ProceedingJoinPoint joinPoint, Object arg) throws Throwable{
	...
}

뒤에 ..이 있으면 더 받을 수도 있는 건가…? 어렵네. 잘 모르겠음 이 부분도.

→ 나중에 맨 정신일 때 봐야할듯.

 

(잘 사용은 안한다고 함. 중요하지 않다고 하심.)

this는 스프링 빈 객체(aop 프록시)를 대상으로 하는 조인 포인트.

target은 프록시가 실제로 가르키는 실제 대상으로 하는 조인 포인트.

(이 부분은 그냥 듣기만 함. 메모 안했음.)

 

@Around("@annotation(retry)")
public Object doRetry(ProceedingJoinPoint joinPoint, Retry retry) throws Throwable

이런 식으로도 인자 받을 수 있음. 인자로 Retry retry가 있기 때문에 자동으로 해당 annotation에 대한 정보를 받아올 수 있음.


AOP 실무 유의점

 

aop를 적용하려면 항상 프록시를 통해 객체를 호출해야함.

문제는 대상 객체의 내부에서 메서드 호출이 발생하면, 프록시를 거치지 않고 대상 객체를 직접 호출함.(실무에서도 만날 수 있는 문제라고 하심)

@Slf4j
@Component
public class CallServiceV0 {
	public void external() {
		log.info("call external");
		internal(); //내부 메서드 호출(this.internal()) 프록시를 거치지 않음.
	}

	public void internal() {
		log.info("call internal");
	}
}

@Slf4j
@Aspect
public class CallLogAspect {
	@Before("execution(* hello.aop.internalcall..*.*(..))")
	public void doLog(JoinPoint joinPoint) {
		log.info("aop={}", joinPoint.getSignature());
	}
}

1. //프록시 호출
2. CallLogAspect : aop=void hello.aop.internalcall.CallServiceV0.external()
3. CallServiceV0 : call external
4. CallServiceV0 : call internal

internal에는 프록시가 호출이 되지 않았음. 그 이유는 this.internal()을 호출하기 때문임.

proxy → target이 되는데 target에서 자신의 method를 호출함.

스프링 프록시의 한계임.

AspectJ는 이런 문제가 발생하지 않음. 그 이유는 실제 byte code를 조작해서 코드를 넣어버리기 때문에.

 

이를 해결하기 위해서 자기 자신을 주입하면 됨

@Slf4j
@Component
public class CallServiceV1 {
	private CallServiceV1 callServiceV1;

	//생성자로 하면 순환 참조를 하기 때문에 문제가 생김. 프록시가 먼저냐 이 bean이 먼저냐.
	//그래서 setter로 주입을 함. 실무에서 신경쓰기 진짜 어려울 듯.
	@Autowired
	public void setCallServiceV1(CallServiceV1 callServiceV1) {
		this.callServiceV1 = callServiceV1;
	}

	public void external() {
		log.info("call external");
		callServiceV1.internal(); //외부 메서드 호출
	}

	public void internal() {
		log.info("call internal");
	}
}

그 다음 해결책은 지연 조회

@Slf4j
@Component
@RequiredArgsConstructor
public class CallServiceV2 {
	// private final ApplicationContext applicationContext;
	//기본편 싱글톤에 프로로타입 주입할 때 봤던 것.
	private final ObjectProvider<CallServiceV2> callServiceProvider;

	public void external() {
		log.info("call external");
		// CallServiceV2 callServiceV2 =
		applicationContext.getBean(CallServiceV2.class);
		CallServiceV2 callServiceV2 = callServiceProvider.getObject();
		callServiceV2.internal(); //외부 메서드 호출
	}

	public void internal() {
		log.info("call internal");
	}
}

ObjectProvider는 실제 객체를 사용할 때 빈을 찾아옴. 자기 자신을 주입받지 않음.

앞에서는 뭔가 어색함.

 

제일 좋은 방법은 내부 호출이 없도록 구조를 바꾸는 방법임. 스프링에서도 이 방법을 권장함.

@Slf4j
@Component
@RequiredArgsConstructor
public class CallServiceV3 {
	private final InternalService internalService;
	public void external() {
		log.info("call external");
		internalService.internal(); //외부 메서드 호출
	}
}

@Slf4j
@Component
public class InternalService {

	public void internal() {
	log.info("call internal");
	}

}

깔끔하긴 한데 클래스 파일이 너무 많이 생길 것 같은데?

 

 

jdk 동적 프록시 한계점.

구체 타입으로 캐스팅이 불가능함. 인터페이스만 가능.

e.g)

interface MemberService ← MemberServiceImpl

MemberServiceImpl proxy = (MemberServiceImpl) proxy.getProxy();

하면 에러가 발생함.(타입 캐스팅 에러)

 

CGLIB는 성공함. MemberService로도 됨.

이러한 타입 캐스팅 에러는 의존관계 주입 시에 발생함.

@Slf4j
@SpringBootTest(properties = {"spring.aop.proxy-target-class=false"}) //JDK 동적 프록시, DI 예외 발생
//@SpringBootTest(properties = {"spring.aop.proxy-target-class=true"}) //CGLIB 프록시, 성공
@Import(ProxyDIAspect.class)
public class ProxyDITest {

	@Autowired MemberService memberService; //JDK 동적 프록시 OK, CGLIB OK
	@Autowired MemberServiceImpl memberServiceImpl; //JDK 동적 프록시 X, CGLIB OK

	@Test
	void go() {
		log.info("memberService class={}", memberService.getClass());
		log.info("memberServiceImpl class={}", memberServiceImpl.getClass());
		memberServiceImpl.hello("hello");
	}
}

CGLIB는 구체 클래스를 기반으로 만들어 지고,

JDK 동적 프록시는 인터페이스 기반으로 만들어지기 때문에.

 

보통 추상화한 인터페이스로 DI를 받아야 함. DI의 장점은 OCP인데, 이러한 장점이 사라짐.

잘 설계되어 있다면, 이러한 문제가 거의 발생하지 않음.

하지만 구체 클래스를 쓰는 경우가 필요한 경우도 있음.

 

그럼 CGLIB만 사용하면 되지 않나?

→ 문제점이 있음.

  • 대상 클래스에 기본 생성자 필수임.
    • 이는 구체 클래스를 상속 받음. super()가 자동으로 실행됨. 그렇기 때분에 기본 생성자를 만들어야 함.(파라미터가 없는 생성자)
  • 생성자 2번 호출됨
    • target객체 생성할 때 생성자가 호출됨. 그리고 프록시 객체를 생성할 때 부모 클래스의 생성자 호출 됨.
  • final 키워드 클래스, 메서드 사용 불가.
    • 클래스에 final 있으면 상속이 안됨. 메서드에 있으면 오버라이딩이 안됨.

스프링 4.0 부터는 objenesis 라는 라이브러리를 이용하여 기본 생성자 없어도 객체 생성이 가능해짐.

4.0부터 CGLIB 생성자 2번 문제 호출도 해결되었음.

→ CGLIB의 단점이 사라짐.(final 클래스나 final 메서드는 거의 사용되지 않음.

 

스프링 부트 2.0부터 CGLIB를 기본를 사용함으로써 구체 클래스 타입 캐스팅을 할 수 있게 했음.

 

 


스프링이 어렵다는 것을 다시 한 번 느낄 수 있었습니다. 제가 모르는 것이 너무 많네요.

현재 회사에서 실무에 투입되기 전에 다른 강의도 좀 보고 가야할 것 같습니다.

 

특히 db와 spring에 대해 좀 공부를 할 필요성이 있는 것 같네요.

 

이번 강의도 좋았습니다. 스프링 부트에서 주로 사용하는 패턴들도 보고, 이를 활용해서 다양한 기능을 구현하는 것이 인상깊었습니다.

 

디자인 패턴도 공부를 해봐야 겠네요. 대충만 알고 있었는데, 이번 강의들을 통해 부족함을 많이 느낄 수 있었습니다.

반응형

이번에 코로나에 걸리면서 일주일 동안 휴가를 가지게 되었습니다.

 

이 기간 동안 인프런의 김영한님 강의를 전부(기본, 고급)을 볼 계획입니다.

 

https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%ED%95%B5%EC%8B%AC-%EC%9B%90%EB%A6%AC-%EA%B8%B0%EB%B3%B8%ED%8E%B8/dashboard

 

이는 기본편을 듣고 제가 보기 편하게 정리한 내용입니다.

 

기초(김영한)

스프링 부트 장점

애플리케이션 쉽게 생성가능. tomcat이 내장임

외부 라이브러리(3rd path) 자동 구성

 

SOLID

OCP를 지키려면 객체를 생성하거나 그러한 것들을 해주는 무언가가 있어야 함.(스프링 DI도 가능)

DIP 추상화에 의존. 구체화에 의존 x. 역할만 알면 되고 구체적인 객체까진 몰라도 됨.

너무 기초.

 

추상화를 위해 인터페이스를 만들면 비용 발생.

→ 확장할 기능 없다면 구체 클래스 직접 사용. 추후 리팩터링 하도록.

추상적인 것과 구체적인 객체 둘 다 의존하는 것은 좋지 않음(DIP 위반)

→ 이렇게 되면, 다른 객체로 교체하려면 수정을 해야 하는 일이 생길 수가 있음(OCP 위반)

 

관심사의 분리

필드에서 직접 new를 통해서 객체를 주지 않도록.

그래서 AppConfig 같은 클래스를 만들어서 이를 주입하도록 수정.

그리고 생성자를 통해서 주입함.

 

구체적인 어떤 클래스가 주입될 지는 외부에서 결정이 됨.(AppConfig)

→ dependency injection

 

만약 할인 전략이 바뀐다면, 이에 해당하는 객체를 만들어주고 AppConfig 부분만 수정해주면 됨. 매우 간단함.

OCP도 만족이 됨. 이를 사용하는 클라이언트 코드는 수정이 필요가 없음.

IoC, DI, 컨테이너

IoC는 프레임워크 같은 곳에서 뭔가를 해주는 것(제어권 역전)

프레임워크는 제어를 알아서 해줌(junit 도)

 

라이브러리는 개발자가 직접 제어.(main 등)

 

정적인 의존 관계는 어플리케이션을 실행 안해도 알 수 있음(import)

동적인 의존 관계는 실행 시점에 결정됨.

 

DI는 런타임(실행 시점)에 객체를 만들고 주입하게 됨.

장점은 앞에서 본 것 처럼 클라이언트 코드 변경이 없음. 그리고 동적인 객체 관계를 쉽게 바꿀 수 있음.(어플리케이션 코드를 손을 대지 않는다는 뜻)

객체 생성하고 DI를 해주는 것을 IoC 컨테이너 혹은 DI 컨테이너라고 함.

(최근에는 DI컨테이너라고 한다고 함)

 

ApplicationContext(스프링 컨테이너임)에서 bean 정보를 가져올 수 있음.

bean이름은 method이름으로 되어 있음.

(물론 Bean(name = “…”) 으로 이름 바꿀 수 있음)

 

주의할 점은 bean 이름 중복되지 않도록.

applicationContext는 beanFactory를 상속받음.

 

싱글톤의 주의점은?

여러 클라이언트가 하나의 객체를 공유하게 됨.

→ stateless 로 설계해야함. 필드 대신 지역변수가 thread local 같은 것을 이용해야 함.

(race condition 피하려고)

 

bean annotation이 있다면,

→ 이미 존재할 경우 기존 객체 반환

→ 없다면 기존 로직 작동.

그래서 싱글톤으로 작동하게 됨.

 

Configuration 어노테이션이 없다면, 싱글톤이 아니게 됨. 호출될 때 마다 새로운 객체 만들어버림.

작동은 함.

 

컴포넌트 스캔은 @Component가 붙은 클래스를 스캔해서 자동으로 빈 등록

(Configuration도 포함. Component가 이미 붙어있기 때문)

→ 자바에서는 annotation을 상속 받는다는 개념이 없음. 이는 스프링의 기능.

 

basePacakages로 검색 시작할 패키지 지정가능 → 이는 속도 향상(원래는 모든 자바 코드 뒤져봄)

디폴트는 ComponentScan이 붙은 설정 정보 클래스의 패키지.

→ 그래서 최상단 패키지에 AppConfig 같은 설정 정보를 넣고, ComponentScan을 붙임.

 

Configuration을 쓰지 않고, 직접 class에 Component를 붙임으로써 bean으로 등록이 가능함.

→ 그렇다면 필드에 bean을 주입하기 위해서는 어떻게?

→@Autowired를 쓰면됨.(근데 없이 그냥 생성자만 이용해도 되지 않나? → 뒤쪽에서 설명해주신다고 함.)

생성자가 한 개라면, 자동으로 주입됨.

 

@Component의 이름은 디폴트로 클래스명의 맨 앞글자는 소문자로 변경됨.

(물론 직접 정의 가능함)

 

빈 이름은 무조건 이름 다르게.

 

Component vs Configuration에서 bean으로 등록하는 경우

수동 등록 빈이 우선임.(후자) 오버라이딩 해버림. 최근 스프링 부트에서는 이를 에러 발생하게 함.

여담) 개발자는 애매하게 하지 말고 명확하게 해야함.


의존성 주입(DI)

 

생성자

호출 시점에 1번 실행됨. → 바뀌지 않도록 할 수 있음. (불변, 필수 일 경우 사용)

생성자가 1개라면, autowired 필요없음.

처음에 생성자로 주입이 되고, 그 뒤에 setter(수정자)로 다시 한번 주입이 됨.

(싱글톤이기 때문에 물론 같은 객체가 들어옴)

 

setter는 autowired에서 required false로 하여, 선택적으로 빈을 사용할 수 있게 할 수 있음.

필드에 autowired를 선언하여 넣을 수 있음.

→ 이는 추천하지 않음.

→외부에서 변경이 불가능하여 테스트하기 힘듦.(mock을 사용 못하나 봄?)

→ 그리고 스프링 컨테이너 없이 테스트할 시에, 아무런 값이 들어가지 않음.

→ 테스트 코드를 짤 때는 사용하는 편임 → 테스트할 때만 이용하니까 상관이 없음.

 

autowired 붙여서 일반 메서드에서도 주입가능함.

하지만 거의 사용 안함.

autowired required false를 하면 호출자체가 안됨.

null을 넣고 싶으면 Nullable을 하면, null이 들어감.

Optional<T>를 하면, Optional.empty() 가 됨.

특정 필드에만 nullable, optional해도 됨.

 

이 중에서, 생성자 주입을 선택해야 하는 이유는?

  1. 불변.

실행 되고는 바뀌지 않음.

테스트할 때에는 생성자를 통해서 다른 객체를 넣을 수 있어서 단위테스트에도 좋음.

생성자를 하면 코드가 많아짐.

롬복을 이용하면 됨 ㅎㅎ..(RequiredArgsConstructor)

(근데 세터나 게터는 주의가 필요하지 않나.)


조회할 빈이 2개 이상이라면?

에러 발생함. 그렇다고 하위 타입으로 직접 지정하면 DIP가 깨짐.

→ 파라미터로 들어오는 이름을 구체 클래스 이름으로 바꾸면 됨.(e.g discountPolicy → rateDiscountPolicy) 필드 이름을 바꿔도 되긴 함. 이 부분은 별로 추천하지 않음.(DIP를 결국엔 해치는 느낌)

 

@Qualifier로 bean에 추가 구분자 부여 가능.

빈에다가도 해주고, 파라미터/필드에도 하면 됨.

만약 해당하는 Qualifier 빈을 찾음.

@Primary로 우선순위 지정 가능

둘 중 우선순위는 qualifier가 높음.

Qualifier는 문자열이라서 컴파일 타임에는 체크가 안됨.

→ 이를 보완하기 위해 어노테이션 이용.

@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Qualifier("hiBean")
public @interface CustomAnnotation {
}

하면 됨. 그리고 이 어노테이션을 이용하면 됨.

(스프링은 이러한 어노테이션 상속이 가능함. 순수 자바 기능 아님!)


모든 빈 가져오기

그냥 list로 선언하고 생성자 파라미터로 list를 넣어주면 됨.

Map으로 해도 됨. 이 때 key는 String이고 빈 이름임.

 

만약 특정 클래스를 빼고 싶다면?

이는 올마른 디자인이 아니라고 생각함.(개인적인 생각) 그래서 이러한 기능보다는 클래스 구조를 바꿔야할 것 같음.

 

자동 빈(@Component)으로 해도 OCP, DIP는 지켜짐.

그러면, 수동빈(@Configuration, @Bean) 등록은 언제?

→ 업무 로직 빈(repository, service, controller 등)에서는 자동 빈이 좋음. 빈들이 너무 많음.

→ 기술 지원 빈(db 연결, AOP, 공통 로그처리) 에서는 수동 빈. 이는 수가 적고 전체적인 어플리케이션에 광범위 영향을 미침.(security 설정 느낌인가)

 

비즈니스 로직에서 다형성을 이용할 때는 수동 빈이 좋을 수 있음.

여러 빈 가져올 경우.

→ List<CustomClass> 할 경우, CustomClass를 하나하나 전부 찾아봐야 함.

별도의 Config 클래스를 만들어서 처리하면, 어떠한 빈들이 등록되어 있는지 한눈에 확인할 수 있음.

 


스프링에서 의존관계 주입이 되면, 콜백 메서드를 통해서 초기화 시점을 알려줄 수 있음.

컨테이너 종료 전에도 콜백을 통해 알려줄 수 있음.

 

스프링 컨테이너 생성 → 빈 생성 → 의존 관계 주입 → 초기화 콜백 → 사용 → 소멸 전 콜백 → 스프링 종료

 

객체 생성과 초기화는 별도로 두는 것이 좋음.

생성자는 필수 정보를 받아와서 메모리를 할당하고 객체를 만드는 역할.

초기화는 이런 정보를 활용해서 무거운 작업(db 연결 등) 수행

db연결 같은 무거운 작업은 생성자보다는 별도의 메소드에서 하는 것이 유지보수하기 더 좋다고 함(왜? 이 부분은 잘 모르겠음)

 

InitalizingBean(초기화 콜백)에 있음. 이를 implements 하면 됨(afterPropertiesSet 오버라이딩).

DisposableBean(소멸 콜백). 이를 implements하면 됨.(destroy 오버라이딩)

단점은 외부 라이브러리에 적용하기 힘듦. 그리고 코드레벨에서 직접 수정해서 좀 그럼. 그리고 스프링에 의존적임.

지금은 이러한 방법 사용 거의 안함.

 

@Bean(initMethod = “메소드 이름”, destroyMethod = “메소드 이름”)

으로 가능함.

장점은 빈에 의존하지 않음. 코드를 고칠 수 없는 외부라이브러리에도 적용 가능.

디폴트 값은 (inferred) 임.

외부 라이브러리 대부분 close, shutdown을 이용함. 이러한 이름이 있다면 자동으로 호출해줌.

 

메소드에 @PostContruct, @PreDestroy 이용하면 됨 그냥. 이 방법을 추천한다고 함. 스프링에 종속적인 기능이 아님.(패키지 보면 좀 다름)

단점은 외부 라이브러리에 사용 못함. 코드를 수정해야 하기 때문에.


빈 스코프

빈은 기본적으로 싱글톤으로 만들어짐. 컨테이너의 시작부터 종료까지 살아있음.

프로토타입은 빈 만들어주고 의존관계까지만 주입해주고 컨테이너가 더 이상 관리 안함.

 

웹 관련도 있음

request : 요청 들어오고 나갈 때까지 유지.

session : 세션 동안.

application : 서블릿 컨텍스트(?)와 같은 범위

 

싱글톤은 빈을 요청하면 이미 만들어둔 객체를 반환함.

 

프로토타입은 요청하면 새로운 객체를 만들고 이를 줌. → 컨테니너가 관리를 안하니까 이미 존재하는 객체를 줄 수가 없음. → 초기화까지만 처리하기 때문에 PreDestroy가 발생하지 않음. 있어도 호출을 안함. 종료는 해당 빈을 이용하는 객체(클라이언트)가 함.

 

싱글톤 빈에서 프로토타입 빈을 사용하게 되면?

싱글톤 빈은 생성 시점에 주입을 받음. 그래서 생성 시점에만 프로토타입 빈을 새로 만들게 됨.

사용할 때는 이미 만들어진 프로토타입 빈을 사용하게 됨. (물론 요청하면 새로 만들겠지만.)

 

사용할 때마다 새로운 빈을 주입받고 싶다면?

쉬운 방법은 사용할 때마다 컨테이너(ApplicationContext 같은)에 프로토타입 빈을 요청하면 됨.

→ 하지만 이 방법은 지저분함. 컨테이너에 종속적임.

→ 프로토타입 빈을 컨테이너에서 찾아주는 역할이 필요함(DL, dependency lookup, 의존관계 조회)

 

Provider를 이용하면 됨.

ObjectProvider<프로토타입 빈 클래스>ObjectFactory<프로토타입 빈 클래스> 이용하면 됨.

필드에 추가하고, 의존성 주입 받으면 됨. 그리고 실제 로직에서 Ojbect~~.getObject() 하면 빈 가져옴. → 스프링 컨테이너를 통해 빈을 찾음 → 프로토타입 빈이기 때문에 새로 만들어서 가져오게 됨. → 단위테스트나 mock이용하기 편해짐.

factory에 편의 기능 더한 것이 provider. stream 처리 등 추가됨. 둘 다 스프링에 의존적.

 

스프링에 의존하지 않는 새로운 기술인 javax.inject.Provider 등장함.(자바 표준)

.get()호출하면 컨테이너에서 찾아서 빈 가져옴.(프로토타입이라 새로 만들게 됨).

별도의 라이브러리가 필요함.

 

 

프로토타입 빈은 언제 사용?

사실 거의 사용 안함.

순환참조 같은 걸 방지할 수 있고 lazy하게 가져오기 가능.(코드에 주석으로 직접 설명이 되어 있음)

 

웹 빈

스코프가 request 같은 빈을 의존 관계 주입해야 할 경우.

만약 Controller나 Service 같은 bean에서 생성자를 통해 해당 request 빈을 주입하려고 하면?

→ 에러 발생. → 사용자가 요청을 해야만, 빈이 생성되기 때문에 컨테이너에서 해당 빈을 찾지 못해서 에러가 발생함.

이는 앞처럼 Provider로 해결가능

(프로토타입은 에러는 발생 안했음. 싱글톤 빈에서 주입할 때, 컨테이너에서 새로운 빈 만들어서 주입하기 때문에. 반면 request는 사용자 요청이 와야지만 만들어져서 이러한 에러가 발생함)

 

Provider가 getObject()를 할 때 bean이 만들어짐(lazy)

그리고 같은 요청이라면, Service나 Controller 모두 같은 객체 이용함.

 

이러한 빈 사용 이유는?

컨트롤러처럼 웹에 의존적인 부분은 이러한 빈을 사용함으로써 service같은 다른 계층으로 웹 정보가 넘어가지 않게 됨. 즉 유지보수하기 쉬워짐.

 

Provicer 대신에 @Scope(proxyMode = ScopeProxyMode.TARGET_CLASS 또는 INTERFACE) 이용하면 됨. (프로토타입도 가능한가?)

이는 진짜가 아닌 가짜를 주입해줌. 실제 객체를 이용할 경우 진짜 빈을 찾아서 작동하게 됨(JPA의 lazy 느낌) 스프링의 CGLIB라는 바이트 코드를 조작하는 라이브러리를 이용하여 이를 가짜를 만듦.

프록시 객체는 싱글톤임.

→ 하지만 내부적으로는 빈이 전부 다르게 생성이 됨. 실제 객체는 범위를 따름. 싱글톤이 아님.주의해야 함.

웹 스코프가 아니여도 프록시 객체 사용가능 하다고 함.

→ 이는 클라이언트 코드를 고치지 않음 → 유지보수하기 매우 좋아짐. → 다형성과 DI 컨테이너가 가진 장점임.

 

 

느낀 점

객체지향을 잘 알아야지만 스프링의 본래 기능을 사용할 수 있을 것 같음.

깔끔한 코드나 느슨한 결합을 하기 위해 좋은 디자인이 필요.

 

강의하시는 분이 정말 대단한 것 같음. 스프링을 왜 사용하고 이에 대해 설명해주는 것이 좋았음. 특히 스코프에서 특별한 스코프를 가진 빈을 사용하는 이유도 설명해주고, 기존 코드를 구현하고 문제점을 보완하기 위해 스프링을 이용하는 것이 인상깊었음.

반응형

async하게 작동하기 위해서 자바는 여러 가지 기술을 지원해줍니다.

 

future나 callback이 있고, 최근에는 reactor가 있습니다.

 

Callback은 우선 return 값이 없습니다.

Callback의 문제점은 callback hell 입니다. callback 깊이가 깊어질수록 indent가 깊어져서 코드를 이해하기 어렵게 됩니다.

userService.getFavorites(userId, new Callback<List<String>>() { //callback inteface

	//성공하면 이거 호출?
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
			//리스트가 비었으면, 여기서 다른 서비스의 콜백 호출함.
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
					//여기서도 다른 callback 호출. 너무 복잡함.
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

	//실패하면 이거?
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

이를 reactor로 바꾸면?

userService.getFavorites(userId) 
            //detail들 가져옴. Flux<Details> 인 것 같음.
           .flatMap(favoriteService::getDetails)
            //만약 비었다면? suggestion을 가져옴.
           .switchIfEmpty(suggestionService.getSuggestions()) 
            //5개만?
           .take(5) 
            //이 뒤 쪽은 다른 thread에서 처리하겠다는 뜻.(이 부분이 없으면 main thread가 이를 처리함)
           .publishOn(UiUtils.uiThreadScheduler()) 
            //보여줌.
           .subscribe(uiList::show, UiUtils::errorPopup);

매우 간단해집니다.

stream이랑 비슷하다고 생각할 수 있습니다.

하지만 stream은 reactor와 달리 subscriber가 여러 개 있을 수 없다고 합니다.\

 

그 밖에도 backpressure 같은 기능을 제공해줍니다.

 

Future의 문제점은 get() 같은 함수를 호출하게 되면 그 부분에 blocking이 걸립니다.\

 

또한 lazy computation을 지원하지 않습니다.

이는 당장 필요하지 않는 부분은 나중에 계산 하는 것을 의미합니다.

예를 들면, JPA에서 프록시로 lazy fetch를 통해 그 값을 사용할 때 데이터를 가져옵니다.

그리고 a and b 같은 경우에는 a가 false인 경우에는 b를 계산하지 않습니다.

(future에서는 지원하지 않는다고 하는 이유는 모르겠습니다. 써본 적이 없어서....)

 

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { 
				CompletableFuture<String> nameTask = ifhName(i); 
				CompletableFuture<Integer> statTask = ifhStat(i); 

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) 
			.collect(Collectors.toList()));
});

List<String> results = result.join(); // 이 부분에서 다른 작업들이 끝날 떄 까지 기다림
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");

reactor는 매우 깔끔합니다.

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
		ids.flatMap(id -> { 
			Mono<String> nameTask = ifhrName(id); 
			Mono<Integer> statTask = ifhrStat(id); 

			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); //끝날 때 까지 기다림?
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

 

flatMap vs flatMapSequentail vs concatMap

 

flatMap은 eagerly subscribing 이고, async입니다.

하지만 순서는 보장해주지 않습니다.

그리고 원소들을 즉시 반환합니다. 즉, subcriber의 작업이 끝나면 해당 원소를 바로 반환합니다.

 

flatMapSequentail은 flatMap에서 순서를 보장해줍니다.

 

concatMap은 앞의 두 개와 다르게 egaer하지 않습니다. 하나의 데이터가 들어오면, 데이터와 결과를 페어로 반환해줍니다.(똑같이 async입니다.)

 

즉,

publish 1

sub 1

publish 2

sub 2 

...

처럼 이런 식으로 반환하게 됩니다.

 

 

 

 

참고:

https://stackoverflow.com/questions/71971062/whats-the-difference-between-flatmap-flatmapsequential-and-concatmap-in-project

 

Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?

I've read from the documentation that flatMap: Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, w...

stackoverflow.com

https://projectreactor.io/docs/core/release/reference/

https://stackoverflow.com/questions/52820232/difference-between-infinite-java-stream-and-reactor-flux

반응형

현재 진행하고 있는 프로젝트가 있습니다. 여기서 경매글을 등록해야 하는데, 이 부분이 html로 이루어져 있습니다.

html로 구현을 했기 때문에 XSS를 방지할 필요성이 있습니다.

 

XSS를 간단하게 설명하면 html의 script, iframe 태그 등을 이용하여 해커가 원하는 코드를 넣을 수 있습니다.

그렇기 때문에 이런 태그들을 허용하면 안 됩니다. 그래서 jsoup을 이용해서 파싱 했습니다.

 

@Override
public String processContent(String content, List<MultipartFile> files) {
    assertNotXSS(content);
    ...
}
private void assertNotXSS(String content) {
    String tar = Jsoup.clean(content, BASE_URL, Safelist.relaxed().preserveRelativeLinks(true));

    if (!tar.equals(content)) {
        throw new AuctionException(AuctionExceptionType.INVALID_AUCTION_CONTENT);
    }
}

이런 식으로 하면 파싱이 가능합니다.

 

파싱이 된 string과 입력으로 온 string이 다른 경우 예외가 발생하게 됩니다.

 

아래는 테스트 코드입니다.

@Test
@DisplayName("XSS 방지 테스트")
public void XSSTest() throws IOException {
    List<MultipartFile> imgs = getFiles(imgPath);
    String content = "<script>console.log(\"hi\");</script></h1><img src=\"1.png\"><img src=\"2.jpg\">";
    assertThrows(AuctionException.class,()->auctionContentHandler.processContent(content,imgs));
}

이처럼 script 태그가 있을 경우에 예외가 발생합니다.

성공적으로 처리하는 것을 알 수 있습니다.(결과를 보여드리기 위해서 파싱하고 print 하게 했습니다. 원래 코드에서는 print 하지 않습니다.)

 

@Test
@DisplayName("정상적으로 처리하는 지 테스트")
public void test() throws IOException {
    List<MultipartFile> imgs = getFiles(imgPath);

    String content = "<h1>\"asdf\"</h1><img src=\"1.png\"><img src=\"2.jpg\">";
    List<String> originalAttr = getHtmlImgAttr(content);

    ret = auctionContentHandler.processContent(content, imgs);
    List<String> afterAttr = getHtmlImgAttr(ret);


    for (int i = 0; i < originalAttr.size(); i++) {
        System.out.printf("%s -> %s%n", originalAttr.get(i), afterAttr.get(i));
        File changedFile = new File(CHANGE_IMG_PATH + "/" + afterAttr.get(i));
        assertTrue(changedFile.exists());
        assertNotEquals(originalAttr.get(i), afterAttr.get(i));
    }
    System.out.println(content);
    System.out.println(ret);
}

이는 정상적인 경매내용이 들어올 경우 테스트합니다.

클라이언트로부터 이미지 파일과 html이 입력으로 오면, 로컬에 파일을 저장합니다.(이 때 파일 이름은 UUID를 이용합니다.)

그리고 html의 img 태그의 attribute를 저장한 파일 이름으로 변경해 줍니다.

 

결과입니다. 실제로 파일이름이 변경되었고, 이 파일이 존재하는지 테스트했을 때 통과했습니다.

jsoup을 이용하여 string을 document로 만들면 <html> <head> <body>가 붙네요.

 

 

 

 

테스트 코드를 이렇게 작성하는 것이 맞는 건지 잘 모르겠네요. 너무 특정 기술에 의존하고 있어서 걸리네요.

만약 html이 아니라 md로 바꾼다거나 아니면 이미지를 로컬에 파일로 저장하지 않고 다른 곳에 저장하게 되면 테스트 코드의 변경이 일어나게 됩니다.

좋은 테스트 코드 작성이 진짜 어렵네요.

반응형

최근에 회사에서 푸시 시스템 관련 일을 하던 중에, RDS의 connection pool이 모자란 현상이 발생했습니다.

kafka를 이용하는 시스템에서 prodcuer 2대 consumer 2대를 이용하니, 3대까지 연결은 되는데 마지막 한 대가 연결이 안되는 에러가 발생했습니다. 이는 rds의 connection pool을 넘어서 그렇습니다. 그래서 producer의 connection pool을 4개로 줄여서 해결했습니다.

 

 

진짜로 해결이 된 것일까요?

사실 connection pool을 감소시킨 producer보다는 consumer부분이 더 신경쓰였습니다. 왜냐하면 consumer가 batch 단위로 데이터를 받아오고 각각의 데이터 검사를 위해 thread를 이용합니다. 즉, batch 크기 만큼 thread가 생기고 만약 db를 조회하면 connection이 일어나게 되니 connection pool이 부족하여 deadlock이 발생할 수 있다고 생각했습니다.

 

 

그래서 한 번 connection pool을 1로 설정하고 실험해봤습니다.

 

Take out 1 messages라는 로그만 남고, 멈춰버렸습니다. 그러더니 나중에 에러가 발생합니다.

 

@Override
public NotificationSettings findNotificationSettingsByDevice(String device) {
    Long notificationId = jpaFCMDeviceRepository.findNotificationIdByDevice(device).orElseThrow();

    return jpaNotificationRepository.findNotificationById(notificationId).orElseThrow(() -> new NullPointerException("Not Exist NotificationSettings for device"));
}

이 부분을 이용하는데, 보면 쿼리가 2개라서 커넥션이 2개가 필요한 줄 알았습니다.

 

그래서 2개로 설정하고 다시 해봤습니다.

push도 성공했다고 이번엔 잘 됩니다. 물론 지금은 데이터가 하나 밖에 없기 때문에 그렇습니다. 만약 데이터가 많아 진다면, thread 개수가 증가하여 여전히 deadlock이 걸립니다.

 

connection이 끝나자마자 반환하면, pool이 1개여도 deadlock이 발생하지 않을 것이라고 생각했습니다. 위 코드에서는 현재 그렇게 하지 못하고 있습니다.

 

직관적으로 이 method를 호출하는 상위 method의 @Transacntional 때문일 것이라고 생각하고 없애고 connection pool 1개로 실행해봤습니다. 결과는 아까와 달리 푸시에 성공했다고 뜹니다.

 

 

왜 이런 지 궁금해서 알아봤습니다.

제 전체 코드입니다.

@Override
@Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
    log.info(String.format("Take Out %d messages.", messages.size()));
    int messageCount = messages.size();
    Thread[] threads = new Thread[messageCount];


    //thread 지우면, connection pool 하나만 이용.
    for(int i = 0; i < messages.size(); i++){
        Message message = messages.get(i);
        threads[i] = new Thread(() -> {

            if(isValidMessage(message)) {
                messenger.deliverMessage(message);
            }
        });
        
        threads[i].start();
    }

    for(int i = 0; i< messageCount; i++){
        try {
            threads[i].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

여기서 isValidMessage가

 

public boolean isDestroy(Message message) {
        NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
        NotificationType messageNotificationType = message.getNotificationType();

        return !deviceNotification.isNotification(messageNotificationType);
}


@Override
public NotificationSettings findNotificationSettingsByDevice(String device) {
    Long notificationId = jpaFCMDeviceRepository.findNotificationIdByDevice(device).orElseThrow();

    return jpaNotificationRepository.findNotificationById(notificationId).orElseThrow(() -> new NullPointerException("Not Exist NotificationSettings for device"));
}

isDestroy를 호출합니다.

Transactional이 있는 상위 method에서 쓰레드를 만들고 isDestroy를 호출하게 됩니다. 여기서 문제가 발생합니다.

그래서 어디가 문제인지 살펴봤습니다.

 

 

 

우선 단순한 반복문으로 바꿔서 테스트해봤습니다.

connection pool이 1개여도 정상적으로 실행됩니다.

 

검사는 for문으로, 메시지를 전달하는 부분은 multi thread로 했습니다. 이 부분은 db조회가 없습니다.

@Transactional
public void distributeMessages(List<Message> messages){
        log.info(String.format("Take Out %d messages.", messages.size()));
        int messageCount = messages.size();

        List<Message> validMessages = new ArrayList<>();


        //thread 지우면, connection pool 하나만 이용.
        for(int i = 0; i < messages.size(); i++){
            Message message = messages.get(i);


                if(isValidMessage(message)) {
                    validMessages.add(message);
                }
                else{
                    messageCount-=1;
                }


        }

        Thread[] threads = new Thread[messageCount];

        for(int i = 0; i< messageCount; i++){
            Message message = validMessages.get(i);
            threads[i] = new Thread(() -> {
                    messenger.deliverMessage(message);
                });

            threads[i].start();
        }

        for(int i = 0; i< messageCount; i++){
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

이 코드는 deadlock 없이 정상적으로 작동합니다.

즉, multi thread를 할 때, thread 안에서 db를 조회하면 해당 문제가 발생하는 것을 발견했습니다.

 

그래서 @Transactional의 범위를 알아봤습니다.

 

알아보니, single thread에서만 적용이 된다고 합니다. 

 

그래서 상위 method의 transactional을 빼고, 하위 method에 설정했습니다.

@Override
@Transactional // 추가
public boolean isDestroy(Message message) {
    NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
    NotificationType messageNotificationType = message.getNotificationType();

    Account account = jpaAccountRepository.findByEmail("tt");
    account.setEmail("ttt");

    jpaAccountRepository.save(account);

    if(true)
        throw new RuntimeException();

    return !deviceNotification.isNotification(messageNotificationType);
}
@Override
// @Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
    log.info(String.format("Take Out %d messages.", messages.size()));
    int messageCount = messages.size();
    Thread[] threads = new Thread[messageCount];


    for(int i = 0; i < messages.size(); i++){
        Message message = messages.get(i);
        threads[i] = new Thread(() -> {

            if(isValidMessage(message)) {
                messenger.deliverMessage(message);
            }
        });

        threads[i].start();
    }

    for(int i = 0; i< messageCount; i++){
        try {
            threads[i].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    System.out.println(jpaAccountRepository.findByEmail("ttt"));
    System.out.println(jpaAccountRepository.findByEmail("tt"));
}

이렇게 하면 

tt가 ttt로 바뀌지 않는 것을 확인할 수 있습니다. rollback이 되었습니다.

 

반대로 아까처럼 상위 method에만 @Transactional을 하면 적용이 되지 않는 것을 알 수 있습니다.

 

 

기존 코드 isDestroy는 transaction이 아니고, 별개로 작동한 것입니다. isDestroy에서 db를 조회하기 위해 connection이 한 개 필요하고, 상위 method가 Transaction이라서 connection이 한개가 필요합니다. 그래서 총 2개가 필요했었습니다.

(알아보니, 쿼리당 connection이 한 개가 필요한 것이 아니라, transaction을 실행할 때 connection이 일어나고 끝나면 반환합니다.)

 

Trasaction때문에 이미 connection 을 하나 이용하고 있는 상태에서 thread가 생성되고 여기서 connection을 기다리게 됩니다. 하지만 상위 method는 join으로 현재 thread가 끝날 때까지 기다리게 되고, connection을 계속 가지고 있게 되어 deadlock 이 발생합니다.

 

결론

 

이를 해결하기 위해서는

@Override
@Transactional(readOnly = true)
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
        log.info(String.format("Take Out %d messages.", messages.size()));
        int messageCount = messages.size();

        List<Message> validMessages = new ArrayList<>();


        for(int i = 0; i < messages.size(); i++){
            Message message = messages.get(i);

                if(isValidMessage(message)) {
                    validMessages.add(message);
                }
                else{
                    messageCount-=1;
                }
        }

        Thread[] threads = new Thread[messageCount];

        for(int i = 0; i< messageCount; i++){
            Message message = validMessages.get(i);
            threads[i] = new Thread(() -> {
                    messenger.deliverMessage(message);
                });

            threads[i].start();
        }

        for(int i = 0; i< messageCount; i++){
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
  }

========================================================================================

@Override
//@Transactional(readOnly = true) 부모 메소드에 있어서 없어도 됨.
public boolean isDestroy(Message message) {
    NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
    NotificationType messageNotificationType = message.getNotificationType();


    return !deviceNotification.isNotification(messageNotificationType);
}

for문을 이용하여 검사하면 됩니다. 하지만 속도 측면에서 아쉽습니다.

 

@Override
// @Transactional(readOnly = true) 필요 없음.
@KafkaListener(topics = "${spring.kafka.fcm.topic}", groupId = "${spring.kafka.fcm.group-id}")
public void distributeMessages(List<Message> messages){
    log.info(String.format("Take Out %d messages.", messages.size()));
    int messageCount = messages.size();
    Thread[] threads = new Thread[messageCount];


    //thread 지우면, connection pool 하나만 이용.
    for(int i = 0; i < messages.size(); i++){
        Message message = messages.get(i);
        threads[i] = new Thread(() -> {

            if(isValidMessage(message)) {
                messenger.deliverMessage(message);
            }
        });
        
        threads[i].start();
    }

    for(int i = 0; i< messageCount; i++){
        try {
            threads[i].join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

========================================================================================
@Override
@Transactional // 필요
public boolean isDestroy(Message message) {
    NotificationSettings deviceNotification = fcmDeviceRepository.findNotificationSettingsByDevice(message.getReceiveDevice());
    NotificationType messageNotificationType = message.getNotificationType();


    return !deviceNotification.isNotification(messageNotificationType);
}

이는 하위 method에 transaction을 달아주면 됩니다. 그러면 1개 만으로도 충분히 deadlock없이 가능합니다. 다른 thread들이 connection을 사용하고 있는 thread가 종료될 때 까지 기다려야 하기 때문에, connection pool이 충분하지 않으면 속도가 느립니다.

반응형

이번에는 두 대의 서버와 local kafka를 이용하여 실험해 본다.

 

kafka는 docker를 이용하여 실행한다.

 

producer

@Async
//항상 1 전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm",Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2 전송
public void produce2(String mm){
    kafkaTemplate.send("fcm", Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

 

Consumer

//Server1
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-1 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}


//Server2
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-2 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}

 

우선 parition이 1개인 kafka와 서버 한 대를 이용하여 테스트한다.

당연하게도  server1의 하나의 consumer가 전부 읽는다.

 

다음에는 서버 한 개를 추가한다. consumer 설정은 위의 코드와 동일하다.

 

server1

server1이다. 하나의 서버로 돌릴 때와 큰 차이가 없다.

 

 

server2

server2에서는 consume이 안되는 것을 확인할 수 있다. 그 이유는 같은 그룹에서는 하나의 partition에 하나의 consumer만 접근할 수 있기 때문이다. 그룹이 partition에 대한 offset을 관리하기 때문에 이러한 현상이 일어난다.

 

 

다음에는 이 상태에서 터미널을 이용하여 kafka에 partition을 하나 추가한다.

server1

 

server2

이처럼 rebalancing이 일어난 것을 확인할 수 있다. 직접 kafka에 partition을 추가했을  뿐인데, 알아서 처리한다. 맨 마지막 줄을 주목하자.

 

결과는?

server1
server2

위에서 보다시피 server1에서는 partition 0에 관해서만 처리하고, server2에서는 1에 대해서만 처리한다.

 

갑자기 궁금해서 partition을 3개까지 늘려봤다.

server1
server2

보면 server1의 하나의 consumer가 2개의 partition에 대해서 처리하도록 바뀐 것을 확인할 수 있다. 이 rebalancing 되는 원리가 궁금하다. 나중에 시간이 되면 오픈소스를 확인해봐야겠다.

 

이번엔 server2를 종료시켜봤다.

server1

rebalancing이 일어나고,

server1

server1의 하나의 consumer가 전부 이를 처리한다.

 

다음에는 server2를 다시 작동시키고 그룹은 다른 그룹으로 변경한다. 현재 offset 읽는 option을 latest로 설정해서, 새로운 메시지를 읽지 않는다.

 

그래서 earliest로 바꾼 후에, 또 다른 그룹으로 바꾸고 실행해보았다.

server2

이전에 넣은 메시지들을 전부 들고와서 처리하는 것을 확인할 수 있다. 그리고 모두 단일 thread로만 처리한다.

 

그리고 kafka에서 topic을 삭제했다. 그러면, consumer에서 계속 에러가 발생한다. offset을 읽을 수 없다고.

무한 루프도는 것처럼 계속 나와서 이에 대해서도 처리를 해줄 필요가 있을 것 같다.

 

그리고 producer를 이용해 메시지를 보내면, topic이 저절로 생긴다. 이는 kafka 내부 옵션으로 자동으로 생성되지 않도록할 수 있다.

 

 

앞에서는 concurrency설정을 1로 했을 경우이다.

 

이번에는 2로 설정해서 해본다.

 

서버 하나, parition 하나

partition이 하나이기 때문에, cocurrency를 2로 해도 차이가 없는 것을 확인할 수 있다.(하나의 partition에는 같은 그룹의 consumer 하나만)

 

partition을 하나 더 추가한다.

server1

server1에서 두 개의 consumer를 이용하여 처리한다. 또한 consumer1(threadId 55)는 partition 0만, consumer2는 partition1만 처리한다.

 

또 partition을 하나 더 추가한다. 총 3개의 partition이 있다.

server1

consumer는 2개만 작동하고, consumer1은 0,1을 처리하고 consumer2는 2를 처리하는 것을 확인할 수 있다.

 

 

다음에는 서버2의 concurrency는 1로 설정해서 실행해본다.

server1
server2

server2의 consumer에게 하나의 partition이 할당된 것을 확인할 수 있다.

 

server2의 concurrency를 2로 설정하고 다시 실행한다.

server1
server2

server1에서 consumer2가 놀게 되고, server2는 모두 작동하는 것을 확인할 수 있다.

여기서 server1의 consumer2가 자원을 얼마나 차지하는 지 궁금하다.  나중에 꼭 소스코드를 봐야겠다.

 

앞에서는 모두 producer에서 send할 때 key값을 랜덤하게 줬다. 그렇게 한 이유가 key 가 없이 하면, 다른 partition에 데이터가 들어가지 않아서 그렇게 했었다.

 

이에 대해서 조사해보니, producer의 partitioner기본 전략은 sticky 이다. sticky는 batch(byte)가 꽉 차면,  다른 partittion의 batch에 데이터를 쌓게 된다. 또한 linger.ms라는 것도 있는데, linger.ms를 넘어가면 자동으로 partition에 batch 데이터를 보낸다. linger.ms의 기본값은 0이고, batch는 조금 크다. 그래서 key없이 하면, 하나의 partition에만 데이터가 들어갔었다.

 

 

그래서 key부분을 지우고, produce1의 batch가 1이 되면 바로 전송하도록 수정하고 실행해봤다. 이는 하나의 서버에 두 개의 method를 만들어서 실행한다.

@Async
//항상 1전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm", mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2전송
public void produce2(String mm){
    kafkaTemplate2.send("fcm", mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

 

 

server1

결과를 보면, 2가 partition 0에는 들어가지 않는 것을 확인할 수 있다. 이는 produce2의 batch가 크기 때문에 partition 1에만 계속 2를 넣는다. partition 0에 2를 넣고 싶어도, batch가 차기 전에 보내버리기  때문에 다음 partition의 batch에 데이터를 넣을 수 없다.

 

하지만 1은 partition 0,1 두 곳 모두에서 존재하는 것을 확인할 수 있다.

 

 

 

 

 

* 혼자 블로그나 공식 문서를 보고 공부하는 것이기 때문에 틀리는 것이 있을 수도 있습니다.

반응형

parition, consumer, producer 가 정확히 어떻게 작동하는 지 궁금하여 테스트를 해보게 되었다.

 

Spring test와 @EmbeddedKafka를 이용하여 테스트 했다.

 

 

이는 producer이다. key는 random으로 아무런 값이나 보내게 된다.

@Async
//항상 1 전송
public void prodcue1(String mm){
    kafkaTemplate.send("fcm",Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

@Async
//항상 2 전송
public void produce2(String mm){
    kafkaTemplate.send("fcm", Integer.toString(new Random().nextInt()), mm);
    System.out.println(String.format("pro %s %s", mm, Thread.currentThread().getId()));
}

여기서 궁금한 점은, 비동기로 작동하는 producer 2개가 같은 parition에 접근하는 경우 데이터가 정상적으로 들어가는지이다.

 

이는 producer가 작동하는 과정이다.

for(int i = 0; i < 20; i++){
	if(i%2==0) producer.produce1("1");
    else 	   producer.produce("2");
 }

 

결과는 다음과 같다.

pro 1 139
pro 1 133
pro 2 138
pro 2 136
pro 1 135
pro 1 137
pro 2 134
pro 2 136
pro 1 135
pro 2 133
pro 1 138
pro 2 140
pro 2 133
pro 1 135
pro 2 136
pro 1 134
pro 1 139
pro 2 137
pro 2 140
pro 1 138

1 10번, 2 10번 올바르게 잘 보내고 있다. 

 

아래는 consumer 설정이다.

//Consumer1.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-1 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}


//Consumer2.java
@KafkaListener(topics = "fcm", groupId = "fcm")
public void listen(ConsumerRecords<String, String> mm){
    for(ConsumerRecord<String, String> c : mm) {
        System.out.println(String.format("listen-2 partition : %s value : %s threadId :%s", c.partition(), c.value(),Thread.currentThread().getId()));
    }
}

동일한 주제와 동일한 그룹으로 메시지를 consume한다.

그리고 메세지가 온 parition과 메시지 값, 그리고 메시지의 threadId(consumer)를 확인한다.

 

같은 partition에 2개의 consumer가 존재할 경우에는 다음과 같이 작동한다.

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127

보면, consumer 하나만 작동하여 메시지를 출력하는 것을 확인할 수 있다. 즉, Consumer2는 아예 작동을 하지 않는다.

 

partition이 2개로 늘어난 다면?

listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-1 partition : 0 value : 1 threadId :127
listen-2 partition : 1 value : 2 threadId :129
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 1 threadId :129
listen-2 partition : 1 value : 2 threadId :129
listen-2 partition : 1 value : 1 threadId :129

다음과 같이 consumer 2개 전부 잘 작동한느 것을 확인할 수 있다. 어느 partition 메시지를 전송할지는 producer의 key에 따라서 결정된다. 여러 partition이 섞여있는 것처럼 보인다.

 

하지만 여기서 핵심은 trehadId 127(Consumer1)은 parition 0에만 접근을 하고, threadId 129(Consumer2)는 partition1에만 접근하는 것을 확인할 수 있다. kafka 자체에서 자동으로 이렇게 할당을 해준다. 이를 rebalancing이라고 한다.

이 전략에 대해서는 따로 공부할 필요성이 있다.

 

2개의 parition에 하나의 consumer만 있다면 어떻게 될까?

listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 0 value : 2 threadId :127
listen-1 partition : 0 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 1 threadId :127
listen-1 partition : 1 value : 2 threadId :127
listen-1 partition : 1 value : 2 threadId :127

이처럼 하나의 consumer가 다 처리하게 된다.

 

 

다음에는 embedded kafka가 아닌 local에 kafka를 작동시킬 예정이다.

또한 하나의 서버가 아닌 두 대의 서버를 이용하여 실험을 해본다.

 

반응형

+ Recent posts