모던 자바 인 액션 16장

CompletableFuture로 비동기 어플리케이션 만들기

각종 온라인몰의 최저가를 구하는 어플리케이션을 구현하는 것을 통해, CompletableFuture을 구현하는 방법을 실습해 본다(실습이라기보다는 개념확인 정도에 가깝지 않을까..)

그 전에

CompletableFuture가 자바 8을 통해 출시되기 전, 자바5부터 지원했던 Future의 단순한 활용법을 한번 살펴보고 가자.

자바8에서 구현한 Future의 샘플 코드

ExecutorService executor = Executors.newCachedTHreadPool();
Future<Double> future = executor.submit(new Callcable<Double>() {
  public Double call() {
    return doSomeLongCompute(); // 시간이 오래 걸리는 작업. 다른 스레드를 통해서 비동기적으로 실행.
  }
});

doSomethingElse(); // 짬 날때 다른 작업을 ㄱㄱ

try{
  Double result = future.get(1, TimeUnit.SECONDS); // 블록. 단 최대 1초만 기다림.
} catch (Exception e) {
  //생략
}

뭐….

doSomeLongCompute(); 가 원래 하고자 하는 행위이고, 얘를 main스레드가 아닌 새로운 스레드를 통해 실행시키고

domSomethingElse(); 를 통해 그동안 main스레드는 다른 작업을 수행하는 로직이다.

그런데 문제가

future.get() 을 통해 main스레드가 ‘블록’상태에 빠진다는 것이다. 만약 doSomeLongCompute()가 억겁의 시간이 걸리는 작업이라면… 억겁의 시간동안 기다려야 하는 문제가 있다. 물론 그래서 코드상으로는 최대 블록 시간인 ‘1초’를 명시해 주어서 이 문제를 회피했지만 말이다.

Future는 제한사항이 몇가지 존재한다

홍길동이 오래 걸리는 작업 여러개를 Future로 구현하는 상황에 빠졌다고 한다. A,B,C,D 네가지 작업을 Future를 통해 구현한다고 하자.

‘오래 걸리는 A작업이 끝나면, 그 결과를 B에 전달하시오. 그리고 B의 결과가 나오면 C,D의 결과와 B를 조합하시오’

와 같은 요구조건/의존성이 있는 상황이라면 Future를 통해서 구현하기가 참 어렵다고 한다(뭐 해보진 않아서 모르겠다.)

Future에 다음과 같은 ‘선언형’ 기능이 있다면 유용할 것이라고 한다.

  • 두개의 비동기 계산결과를 하나로 합친다.
    • 두개의 결과를 서로 독립적일수도 있고, 의존성이 있을수도 있다
  • Future집합이 실행하는 모~든 작업의 완료를 기다린다.
    • A,B,C,D 가 다 끝날때까지 main스레드가 기다림.
  • Future집합중 제일 먼저 완료되는 작업 하나만 기다린다.
    • 같은 결과물을 여러 방법으로 찾을때(shortest path)
  • 프로그램적으로 Future를 완료시킨다
    • 비동기 동작에 수동으로 결과 제공
  • Future완료 동작에 반응
    • main스레드가 블록 상태로 기다리는게 아니라, Future가 능동적으로 main스레드에게 ‘나 끝남’ 이라는 알람을 주는 구조

이 모든 기능을

자바8에서 튀어나온 CompletableFuture가 만족했다고 한다!


그러니까… 이제 진짜로 어플리케이션을 만들어 보자

이 어플리케이션을 만드는 실습을 하는 동안 우리는 이와 같은 기술들을 습득할 수 있다!!!

  • 비동기 API를 제공하는 방법을 배운다.(온라인상점을 운영하고 있는 독자!!!!!!!!!!!!!!!!!!! 에게 특히 유용한 기술)
    • ??? : 뭣하러 비동기로 해? 서버 늘리고 오라클 비싼거 쓰면 되는거 아니야?
  • 동기 API를 사용해야 할 때 코드를 Non-Blocking으로 만드는 방법을 배운다.
    • 두개의 비동기 동작을 파이프라인으로 연결하던가, 두개의 결과를 하나의 비동기 계산으로 합치는…
  • 비동기 동작의 완료에 대응하는 법을 배운다.
    • 최저가를 찾을 때, 각 마켓마다 가격 응답이 오면 즉시 최저가 계산을 실시간으로 하는 방법

가격을 가져오는 목업 메소드 만들기

public class Shop {
  public double getPrice(String product) {
    return calculatePrice(product);
  }

  private double calculatePrice(String product) {
    delay(); //1sec sleep.... for mockup
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
  }
}

목업을 위해 강제로 지연을 만듬..

저 getPrice() 메소드는 동기화 메소드이다. 이제 그 동기 메소드를 비동기 메소드로 변경해 보자.

상점 코드

public class Shop {
  public Future<Double> getPriceAsync(String product) { //change return type.
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread (() -> {          //새롭게 스레드를 생성해서 calculatePrice() 수행 -비동기-
      double price = calculatePrice(product);
      futurePrice.complete(price); // 계산이 완료되면 futurePrice에 값 할당
    }).start();
    return futurePrice; // new Thread를 통한 계산이 끝나지 않고도 그냥 Future가 리턴됨
  }

  private double calculatePrice(String product) {
    delay(); //1sec sleep.... for mockup
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
  }
}

깔-끔

그럼 이제 이 CompletableFuture를 사용하는 클라이언트 코드를 작성해 보자.
클라이언트 코드

  • 상점 코드는 비동기 API를 제공하므로, getPriceAsync() 실행 즉시 Future이 리턴된다. 클라이언트는 리턴된 Future를 이용해서, 나중에 결과를 받아볼 수 있다.
    • 클라이언트는 상점으로부터 Future타입의 가격값을 받을 때까지, 그 사이 다른 일을 할 수 있다(다른 상점의 가격값을 가져온다던지…뭐…)

그렇게 모든 코드가 계획대로 잘 수행된다면 좋지만

현실은 그렇지 않은 법. 모든 코드가 계획대로,생각대로 잘 수행될 리가 없다.

현재 예제 상황에서의 가장 큰 문제는, ‘가격을 계산하는 로직이 수행되다가 에러가 발생’ 하는 경우이다. main스레드가 아닌 별도의 스레드를 생성해서 로직을 수행시키는 과정에서 에러가 난 것이니, 에러의 주체는 ‘별도의 스레드’ 이고, 이를 명시적으로 main스레드에게 알려주기 전까지 main스레드는…… 오매불망 ‘가격을 계산하는 로직이 수행 완료’ 되기만을 기다릴 것이다.

그래서 예외처리가 중요하다.

상점 코드에다가, 예외가 발생하였을 시 main스레드에 이를 noti시켜주는 부분을 추가해 보자.

public class Shop {
  public Future<Double> getPriceAsync(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread (() -> {
      try {
        double price = calculatePrice(product);
        futurePrice.complete(price);
      } catch (Exception e) {
        futurePrice.completeExceptionally(e);  // 에러발생 시 에러로 Future종료.
      }
    }).start();
    return futurePrice;
  }

  private double calculatePrice(String product) {
    delay(); //1sec sleep.... for mockup
    return random.nextDouble() * product.charAt(0) + product.charAt(1);
  }
}

가독성

살이 붙어나가면서 getPriceAsync가 점점 더러워지기 시작한다….

그래서 CompletableFuture는 이런 더럽고 복잡한 소스를 한 줄로 제공해 주는 메소드가 있다.

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

편-안

클라이언트 코드를 좀 더 고도화해 보자

사실, 배우고자 상점 코드를 손댄 것이지, 우리가 실제로 일 할때, callee의 API로직은 우리가 함부로 건드리지 못한다.(그럴 권한도 없고,그래서도 안되고)

이제부터는 상점 코드를 건드지리 못한다고 가정하고 && 가격을 가져오는 로직이 ‘동기’ 방식의 블록 메소드라고 가정한 채로…

caller인 클라이언트 코드를 좀 고도화 해 보자.

모든 상점에 순차적으로 정보를 요청하는 findPrices라는 메소드를 만든다고 하자.

List<Shop> shops = Arrays.asList(new Shop("쿠팡"),
                                 new Shop("네이버"),
                                 new Shop("11st"),
                                 new Shop("G마켓"));



public List<String> findPrices(String product) {
  return shops.stream()
              .map(shop -> String.format("%s price is %.2f", 
                                         shop.getName(), 
                                         shop.getPrice(product)))
              .collect(toList());
}

findPrices 수행시간을 생각해보자… 4초는 족히 넘는다. 이 코드대로라면 우리 할머니도 이것보단 빠른 결과값을 낼 것이다.

당연히 이런 4초나 걸리는 로직을 그대로 사용할 수는 없고, 이걸 병렬 스트림으로 요청해 보자.

List<Shop> shops = Arrays.asList(new Shop("쿠팡"),
                                 new Shop("네이버"),
                                 new Shop("11st"),
                                 new Shop("G마켓"));



public List<String> findPrices(String product) {
  return shops.parallelStream() //바꾼건 이것밖에 없음!!!!!
              .map(shop -> String.format("%s price is %.2f", 
                                         shop.getName(), 
                                         shop.getPrice(product)))
              .collect(toList());
}

편-안

하지만 이는 동기 호출이다. 단순히 stream만을 parallel하게 돌렸다고 해서 1초동안 블로킹이 걸리는 시간 틈새까지 쪼개서 사용할 수는 없다.

그래서 여기서 CompletableFuture를 추가해서 비동기 호출구조로 코드를 변경해 본다.

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = shops.stream()
                                                      .map(shop -> 
                                                             CompletableFuture.supplyAsync(() -> 
                                                               shop.getName() + " price is " + shop.getPrice(product)))
                                                      .collect(toList());

  return priceFutures.stream()
                     .map(CompletableFuture::join)
                     .collect(toList());
}

편——안

join을 별도의 스트림으로 뺀 이유

빼지 않고 첫번째 스트림에다 join을 같이 넣어버리면, 첫번째 상점(쿠팡)의 결과값을 받아올때까지 스트림이 다음 요소를 실행시키지 않고 기다리기 때문…. 어찌보면 당연하다. 구현할때 주의하자.

Leave a Comment