async하게 작동하기 위해서 자바는 여러 가지 기술을 지원해줍니다.

 

future나 callback이 있고, 최근에는 reactor가 있습니다.

 

Callback은 우선 return 값이 없습니다.

Callback의 문제점은 callback hell 입니다. callback 깊이가 깊어질수록 indent가 깊어져서 코드를 이해하기 어렵게 됩니다.

userService.getFavorites(userId, new Callback<List<String>>() { //callback inteface

	//성공하면 이거 호출?
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
			//리스트가 비었으면, 여기서 다른 서비스의 콜백 호출함.
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
					//여기서도 다른 callback 호출. 너무 복잡함.
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

	//실패하면 이거?
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

이를 reactor로 바꾸면?

userService.getFavorites(userId) 
            //detail들 가져옴. Flux<Details> 인 것 같음.
           .flatMap(favoriteService::getDetails)
            //만약 비었다면? suggestion을 가져옴.
           .switchIfEmpty(suggestionService.getSuggestions()) 
            //5개만?
           .take(5) 
            //이 뒤 쪽은 다른 thread에서 처리하겠다는 뜻.(이 부분이 없으면 main thread가 이를 처리함)
           .publishOn(UiUtils.uiThreadScheduler()) 
            //보여줌.
           .subscribe(uiList::show, UiUtils::errorPopup);

매우 간단해집니다.

stream이랑 비슷하다고 생각할 수 있습니다.

하지만 stream은 reactor와 달리 subscriber가 여러 개 있을 수 없다고 합니다.\

 

그 밖에도 backpressure 같은 기능을 제공해줍니다.

 

Future의 문제점은 get() 같은 함수를 호출하게 되면 그 부분에 blocking이 걸립니다.\

 

또한 lazy computation을 지원하지 않습니다.

이는 당장 필요하지 않는 부분은 나중에 계산 하는 것을 의미합니다.

예를 들면, JPA에서 프록시로 lazy fetch를 통해 그 값을 사용할 때 데이터를 가져옵니다.

그리고 a and b 같은 경우에는 a가 false인 경우에는 b를 계산하지 않습니다.

(future에서는 지원하지 않는다고 하는 이유는 모르겠습니다. 써본 적이 없어서....)

 

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { 
				CompletableFuture<String> nameTask = ifhName(i); 
				CompletableFuture<Integer> statTask = ifhStat(i); 

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) 
			.collect(Collectors.toList()));
});

List<String> results = result.join(); // 이 부분에서 다른 작업들이 끝날 떄 까지 기다림
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");

reactor는 매우 깔끔합니다.

Flux<String> ids = ifhrIds(); 

Flux<String> combinations =
		ids.flatMap(id -> { 
			Mono<String> nameTask = ifhrName(id); 
			Mono<Integer> statTask = ifhrStat(id); 

			return nameTask.zipWith(statTask, 
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); //끝날 때 까지 기다림?
assertThat(results).containsExactly( 
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

 

flatMap vs flatMapSequentail vs concatMap

 

flatMap은 eagerly subscribing 이고, async입니다.

하지만 순서는 보장해주지 않습니다.

그리고 원소들을 즉시 반환합니다. 즉, subcriber의 작업이 끝나면 해당 원소를 바로 반환합니다.

 

flatMapSequentail은 flatMap에서 순서를 보장해줍니다.

 

concatMap은 앞의 두 개와 다르게 egaer하지 않습니다. 하나의 데이터가 들어오면, 데이터와 결과를 페어로 반환해줍니다.(똑같이 async입니다.)

 

즉,

publish 1

sub 1

publish 2

sub 2 

...

처럼 이런 식으로 반환하게 됩니다.

 

 

 

 

참고:

https://stackoverflow.com/questions/71971062/whats-the-difference-between-flatmap-flatmapsequential-and-concatmap-in-project

 

Whats the difference between flatMap, flatMapSequential and concatMap in Project Reactor?

I've read from the documentation that flatMap: Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, w...

stackoverflow.com

https://projectreactor.io/docs/core/release/reference/

https://stackoverflow.com/questions/52820232/difference-between-infinite-java-stream-and-reactor-flux

반응형

+ Recent posts