안정적 비동기 프로그래밍
업데이트:
Future 단순 활용
- 비동기 계산을 모델링하는데 Future를 이용할 수 있으며 계산이 끝났을 때 결과에 접근할 수 있는 참조를 제공한다.
- 오래걸리는 작업은 Callable 내부 객체로 감싸 비동기적으로 실행한다.
ExecutorService executor =Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
//시간이 걸리는 작업
return doSomeLongComputation();
}
})
//비동기 작업을 수행하는동안 다른 작업도 동작
doSomeThingElse();
//1초까지 결과가 나올때까지 대기하고 블록된다.
future.get(1, TimeUnit.SECONDS);
Future 제한
- 두개의 결과가 하나로 합쳐질때 각각 서로 독립적일 수 있으며 하나에 의존적일 수 있다.
- Future 집합이 실행하는 모든 태스크의 완료를 기다린다.
- 다양한 방식으로 같은 결과를 구할때 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻는다.
- 프로그램적으로 완료시킨다.(비동기 동작에 수동으로 결과 제공)
- 블록되지 않고 결과가 준비되었다는 알람을 받아야 원하는 추가동작이 가능하다.
동기 API와 비동기 API
전통적인 동기 APi에서는 메서드를 호출한 다음에 계산을 완료한 후 다음 계산을 호출한다. 이를 블록 호출이라한다.
비동기 API는 즉시 반환되며 끝내지 못한 나머지 작업은 호출자 스레드와 동기적으로 실행될 수 있도록 다른 스레드에 할당한다. 이를 비블록 호출이라 한다. 다른 스레드에 할당된 나머지 계산 결과는 콜백 메서드를 호출해서 전달하거나 계산 결과가 끝날 때까지 기다름 메서드를 추가해 호출하면서 전달한다.
비동기 API 구현
두가지 이상의 작업을 함께 작업하려면 비동기 식으로 구현해야 한다. CompletableFuture를 사용해 간단히 비동기 API를 구현한다.
public Future<Double> getPriceAsync(String product){
// 계산 결과를 포함하는 CompletableFuture
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(
//다른 스레드에서 비동기적으로 계산을 수행
double price = calculatePrice(product);
//오래 시간이 걸리는 계산이 완료되면 future에 값 설정
futurePrice.complete(price)
).start();
//계산 완료를 기다리지 않고 Future를 반환
return futurePrice;
}
Future로 값을 반환 받으며 계산중에 다른 작업과 동시에 작업이 가능하다.
get() 통해서 값을 받고 계산중이면 블록한다.
에러 처리 방법
계산중에 문제가 발생시 get 메서드가 반환될때까지 기다릴 수 없으니 get메서드의 오버로드 버전을 만들어 이를 해결한다.
public Future<Double> getPriceAsync(String product){
// 계산 결과를 포함하는 CompletableFuture
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(
try{
double price = calculatePrice(product);
//계산이 정상적으로 종료되면 Future에 값을 포함해 반환
futurePrice.complete(price)
}catch(e){
//문제 발생시 에러를 포함시켜 Future 반환
futurePrice.completeExceptionally(e);
}
).start();
return futurePrice;
}
supplyAsync
- supplyAsync 메서드는 Supplier를 인수로 받아 CompletableFuture를 반환한다.
- Supplier를 실행해서 비동기적으로 결과를 생성하는데 ForkJoinPool의 Executor 중 하나가 실해된다.
- 두번쨰 인수를 통해 오버로드 버전의 supplyAsync 메서드를 이용해서 Executor를 지정 할 수 있다.
publlic Future<Double> getPriceAsync(String product){
return CompletableFuture.supplyAsync(() -> calculatePrice([rpdict));
}
비블록 코드 만들기
병렬 스트림으로 요청 병렬화를 통해 비블록 코드를 만든다.
public List<String> findPrices(String product){
return shops.parallelStream()
.map(...)
.collect(toList());
}
CompletableFuture로 비동기 호출 구현하기
- 두 map 연산을 하나의 스트림 파이프 라인이 아닌 두개의 스트림 파이프라인을 사용함으로써 순차 계산을 회피했다.
- 하나의 파이프라인을 사용시 게이른 특성으로 동기적, 순차적으로 이루어진다.
- 작업의 갯수와 스레드 갯수에 따라 병렬 처리방식보다 빠르거나 느릴 수 있다.
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync( //각각의 계산을 비동기로
() -> String.format("%s is %.2f", shop.getName(), shop.getPrice(product))))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join) //모든 비동기 동작이 끝나길 기다린다.
.collect(Collectors.toList());
}
커스텀 Executor
- 최적의 스레드 수에 맞게 Excutor을 만드어 효율적으로 처리한다.
- 데본 스레드를 사용해서 자바 프로그램이 종료될때 강제로 실행이 종료될 수 있도록 처리한다.
- supplyAsync의 두번째 인수로 전달할 수 있다.
// 상점 수만큼 스레드를 갖는 풀을 생성(0~100사이)
pivate final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true); // 프로그램 종료를 방해하지 않는 데몬 스레드를 사용.
return t;
}
});
I/O가 포함되지 않는 계산에는 스트림 인터페이스가 효율적일 수 있으며 포함되는 경우에는 CompletableFuture가 더 많은 유연성을 제공하여 처리 할 수 있다.
비동기 작업 파이프 라인
supplyAsync
- 전달받은 람다를 비동기적으로 수행한다.
thenApply
- CompletableFuture가 끝날때까지 블록하지 않는다.
- CompletableFuture 동작을 완료한 다음에 메서드로 전달된 람다 표현식을 적용할 수 있다.
thenCompose
- 두 비동기 연산을 파이프라인으로 만들 수 있다.
- CompletableFuture에 themCompose 메서드를 호출하고 Function에 넘겨주는 식으로 조합할 수 있다.
- Function은 첫번째 CompletableFuture 반환 결과를 인수로 받고 두번째는 반환한다.
- 두번째 CompletableFuture는 첫번째의 결과를 계산의 입력으로 사용한다.
private List<String> findDiscountedPriceWithCustomExecutor(String product) {
List<CompletableFuture<String>> collect = shops.stream()
//가격 정보 얻기
.map(shop -> CompletableFuture.supplyAsync(()
-> shop.getPrice(product), executor))
//Quote 파싱하기
.map(future -> future.thenApply(Quote::parse))
//CompletableFuture를 조합해서 할인된 가격 계산하기
.map(future -> future.thenCompose(
quote -> CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote),executor)))
.collect(Collectors.toList());
return collect.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
thenCombine
- 독립적으로 실행된 두 개의 CompletableFuture 결과를 이용하여 연산한다.
- 두 결과를 어떻게 합칠지 정의된 BiFunction을 두번째 인수로 받는다.
thenCombineAsync
- 두개의 CompletableFuture결과를 반환하는 새로운 Future를 반환한다.
자바9 추가
- orTimeout : 지정된 시간이 지난 후 CompletableFuture를 TimeoutException으로 완료하게 한다.
- complteOnTimeout : 지정된 시간이 지난 후 지정한 기본 값을 이용해 연산을 이어가게 한다.
CompletableFuture의 종료에 대응하는 방법
- thenAccept : CompletableFuture가 생성한 결과를 어떻게 소비할지 미리 정한다.
- allOf : 전달받은 CompletableFuture 배열이 모두 완료될때 CompletableFuture
를 반환한다. - anyOf : 전달받은 CompletableFuture 배열 중 하나라도 작업이 끝났을 떄 완료한 CompletableFuture 반환한다.
댓글남기기