CompletableFuture로 비동기 어플리케이션 만들기
각종 온라인몰의 최저가를 구하는 어플리케이션을 구현하는 것을 통해, CompletableFuture을 구현하는 방법을 실습해 본다(실습이라기보다는 개념확인 정도에 가깝지 않을까..)
그 전에
CompletableFuture가 자바 8을 통해 출시되기 전, 자바5부터 지원했던 Future의 단순한 활용법을 한번 살펴보고 가자.
자바8에서 구현한 Future의 샘플 코드
ExecutorService executor = Executors.newCachedTHreadPool();
Future<Double> future = executor.submit(new Callcable<Double>() {
public Double call() {
return doSomeLongCompute(); // 시간이 오래 걸리는 작업. 다른 스레드를 통해서 비동기적으로 실행.
}
});
doSomethingElse(); // 짬 날때 다른 작업을 ㄱㄱ
try{
Double result = future.get(1, TimeUnit.SECONDS); // 블록. 단 최대 1초만 기다림.
} catch (Exception e) {
//생략
}
뭐….
doSomeLongCompute(); 가 원래 하고자 하는 행위이고, 얘를 main스레드가 아닌 새로운 스레드를 통해 실행시키고
domSomethingElse(); 를 통해 그동안 main스레드는 다른 작업을 수행하는 로직이다.
그런데 문제가
future.get() 을 통해 main스레드가 ‘블록’상태에 빠진다는 것이다. 만약 doSomeLongCompute()가 억겁의 시간이 걸리는 작업이라면… 억겁의 시간동안 기다려야 하는 문제가 있다. 물론 그래서 코드상으로는 최대 블록 시간인 ‘1초’를 명시해 주어서 이 문제를 회피했지만 말이다.
Future는 제한사항이 몇가지 존재한다
홍길동이 오래 걸리는 작업 여러개를 Future로 구현하는 상황에 빠졌다고 한다. A,B,C,D 네가지 작업을 Future를 통해 구현한다고 하자.
‘오래 걸리는 A작업이 끝나면, 그 결과를 B에 전달하시오. 그리고 B의 결과가 나오면 C,D의 결과와 B를 조합하시오’
와 같은 요구조건/의존성이 있는 상황이라면 Future를 통해서 구현하기가 참 어렵다고 한다(뭐 해보진 않아서 모르겠다.)
Future에 다음과 같은 ‘선언형’ 기능이 있다면 유용할 것이라고 한다.
- 두개의 비동기 계산결과를 하나로 합친다.
- 두개의 결과를 서로 독립적일수도 있고, 의존성이 있을수도 있다
- Future집합이 실행하는 모~든 작업의 완료를 기다린다.
- A,B,C,D 가 다 끝날때까지 main스레드가 기다림.
- Future집합중 제일 먼저 완료되는 작업 하나만 기다린다.
- 같은 결과물을 여러 방법으로 찾을때(shortest path)
- 프로그램적으로 Future를 완료시킨다
- 비동기 동작에 수동으로 결과 제공
- Future완료 동작에 반응
- main스레드가 블록 상태로 기다리는게 아니라, Future가 능동적으로 main스레드에게 ‘나 끝남’ 이라는 알람을 주는 구조
이 모든 기능을
자바8에서 튀어나온 CompletableFuture가 만족했다고 한다!
그러니까… 이제 진짜로 어플리케이션을 만들어 보자
이 어플리케이션을 만드는 실습을 하는 동안 우리는 이와 같은 기술들을 습득할 수 있다!!!
- 비동기 API를 제공하는 방법을 배운다.(온라인상점을 운영하고 있는 독자!!!!!!!!!!!!!!!!!!! 에게 특히 유용한 기술)
- ??? : 뭣하러 비동기로 해? 서버 늘리고 오라클 비싼거 쓰면 되는거 아니야?
- 동기 API를 사용해야 할 때 코드를 Non-Blocking으로 만드는 방법을 배운다.
- 두개의 비동기 동작을 파이프라인으로 연결하던가, 두개의 결과를 하나의 비동기 계산으로 합치는…
- 비동기 동작의 완료에 대응하는 법을 배운다.
- 최저가를 찾을 때, 각 마켓마다 가격 응답이 오면 즉시 최저가 계산을 실시간으로 하는 방법
가격을 가져오는 목업 메소드 만들기
public class Shop {
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay(); //1sec sleep.... for mockup
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
}
목업을 위해 강제로 지연을 만듬..
저 getPrice() 메소드는 동기화 메소드이다. 이제 그 동기 메소드를 비동기 메소드로 변경해 보자.
상점 코드
public class Shop {
public Future<Double> getPriceAsync(String product) { //change return type.
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread (() -> { //새롭게 스레드를 생성해서 calculatePrice() 수행 -비동기-
double price = calculatePrice(product);
futurePrice.complete(price); // 계산이 완료되면 futurePrice에 값 할당
}).start();
return futurePrice; // new Thread를 통한 계산이 끝나지 않고도 그냥 Future가 리턴됨
}
private double calculatePrice(String product) {
delay(); //1sec sleep.... for mockup
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
}
깔-끔
그럼 이제 이 CompletableFuture를 사용하는 클라이언트 코드를 작성해 보자.
클라이언트 코드


- 상점 코드는 비동기 API를 제공하므로, getPriceAsync() 실행 즉시 Future이 리턴된다. 클라이언트는 리턴된 Future를 이용해서, 나중에 결과를 받아볼 수 있다.
- 클라이언트는 상점으로부터 Future타입의 가격값을 받을 때까지, 그 사이 다른 일을 할 수 있다(다른 상점의 가격값을 가져온다던지…뭐…)
그렇게 모든 코드가 계획대로 잘 수행된다면 좋지만
현실은 그렇지 않은 법. 모든 코드가 계획대로,생각대로 잘 수행될 리가 없다.
현재 예제 상황에서의 가장 큰 문제는, ‘가격을 계산하는 로직이 수행되다가 에러가 발생’ 하는 경우이다. main스레드가 아닌 별도의 스레드를 생성해서 로직을 수행시키는 과정에서 에러가 난 것이니, 에러의 주체는 ‘별도의 스레드’ 이고, 이를 명시적으로 main스레드에게 알려주기 전까지 main스레드는…… 오매불망 ‘가격을 계산하는 로직이 수행 완료’ 되기만을 기다릴 것이다.
그래서 예외처리가 중요하다.
상점 코드에다가, 예외가 발생하였을 시 main스레드에 이를 noti시켜주는 부분을 추가해 보자.
public class Shop {
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread (() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
futurePrice.completeExceptionally(e); // 에러발생 시 에러로 Future종료.
}
}).start();
return futurePrice;
}
private double calculatePrice(String product) {
delay(); //1sec sleep.... for mockup
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
}
가독성
살이 붙어나가면서 getPriceAsync가 점점 더러워지기 시작한다….
그래서 CompletableFuture는 이런 더럽고 복잡한 소스를 한 줄로 제공해 주는 메소드가 있다.
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
편-안
클라이언트 코드를 좀 더 고도화해 보자
사실, 배우고자 상점 코드를 손댄 것이지, 우리가 실제로 일 할때, callee의 API로직은 우리가 함부로 건드리지 못한다.(그럴 권한도 없고,그래서도 안되고)
이제부터는 상점 코드를 건드지리 못한다고 가정하고 && 가격을 가져오는 로직이 ‘동기’ 방식의 블록 메소드라고 가정한 채로…
caller인 클라이언트 코드를 좀 고도화 해 보자.
모든 상점에 순차적으로 정보를 요청하는 findPrices라는 메소드를 만든다고 하자.
List<Shop> shops = Arrays.asList(new Shop("쿠팡"),
new Shop("네이버"),
new Shop("11st"),
new Shop("G마켓"));
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(),
shop.getPrice(product)))
.collect(toList());
}
findPrices 수행시간을 생각해보자… 4초는 족히 넘는다. 이 코드대로라면 우리 할머니도 이것보단 빠른 결과값을 낼 것이다.
당연히 이런 4초나 걸리는 로직을 그대로 사용할 수는 없고, 이걸 병렬 스트림으로 요청해 보자.
List<Shop> shops = Arrays.asList(new Shop("쿠팡"),
new Shop("네이버"),
new Shop("11st"),
new Shop("G마켓"));
public List<String> findPrices(String product) {
return shops.parallelStream() //바꾼건 이것밖에 없음!!!!!
.map(shop -> String.format("%s price is %.2f",
shop.getName(),
shop.getPrice(product)))
.collect(toList());
}
편-안
하지만 이는 동기 호출이다. 단순히 stream만을 parallel하게 돌렸다고 해서 1초동안 블로킹이 걸리는 시간 틈새까지 쪼개서 사용할 수는 없다.
그래서 여기서 CompletableFuture를 추가해서 비동기 호출구조로 코드를 변경해 본다.
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop ->
CompletableFuture.supplyAsync(() ->
shop.getName() + " price is " + shop.getPrice(product)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
편——안
join을 별도의 스트림으로 뺀 이유
빼지 않고 첫번째 스트림에다 join을 같이 넣어버리면, 첫번째 상점(쿠팡)의 결과값을 받아올때까지 스트림이 다음 요소를 실행시키지 않고 기다리기 때문…. 어찌보면 당연하다. 구현할때 주의하자.