반응형
CompletableFuture는
비동기 작업을 만들고(Async),
작업이 끝났을 때 할 일을 등록하고(Callback),
여러 비동기 작업을 연결/합치기 위한(Compose/Combine) 도구입니다.
기존 Future의 '직접 기다리기(get)만 된다' 한계를 콜백 체이닝 + 예외/타임아웃 처리로 해결합니다.
1. 왜 필요한가? (Future의 한계)
- Future#get()은 블로킹이라 결과가 올 때까지 스레드를 점유합니다.
- 미래의 결과를 다른 비동기 결과와 조합하거나, 성공/실패에 따라 다른 흐름을 만들기 어렵습니다.
- 타임아웃, 폴백(fallback), 체이닝 같은 고급 패턴을 직접 구현해야 합니다.
👉 CompletableFuture는 콜백 기반으로 이 문제를 해결합니다.
2. 핵심 용어
- 비동기(Async) : 다른 스레드에서 일을 시켜두고, 끝나면 알려달라(콜백)고 요청
- 콜백(Callback) : "끝나면 이거 해"를 등록 (thenApply/thenAccept/thenRun)
- 체이닝(Chaining) : 콜백 뒤에 콜백을 연결(...then...then...)
- 조합(Compose/Combine):
- thenCompose: 순차 의존(앞 결과를 받아 다음 비동기 호출)
- thenCombine: 병렬 독립(둘 다 끝난 뒤 합성)
3. 예제
3.1 값이 없는 작업 : runAsync
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
System.out.println("비동기 작업 중...");
});
// 필요하면 맨 끝 경계에서만 기다리기
f.join();
3.2 값을 돌려주는 작업 : supplyAsync
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> 10);
Integer result = f.join(); // 10
3.3 콜백 등록 : thenApply / thenAccept / thenRun
CompletableFuture<Void> flow =
CompletableFuture.supplyAsync(() -> 10) // ① 10 생성
.thenApply(x -> x * 2) // ② 20
.thenAccept(v -> System.out.println("값: " + v)) // ③ 출력
.thenRun(() -> System.out.println("끝!")); // ④ 마무리
flow.join();
- thenApply: 값을 변환해서 다음으로 전달
- thenAccept: 값을 소비(출력 등), 반환 없음
- thenRun: 값 없이 후속 작업
…Async 변형들(예:thenApplyAsync)은 다음 콜백을 스레드풀에 위임합니다.
UI/서블릿 스레드 점유를 피할 때 유용합니다.
4. 순차 의존 vs 병렬 독립
4.1 순차 의존 : thenCompose
앞의 결과로 다음 비동기 호출을 해야 할 때 (=flatMap)
CompletableFuture<User> userF = findUserAsync(userId);
CompletableFuture<List<Order>> ordersF =
userF.thenCompose(user -> findOrdersByUserAsync(user));
List<Order> orders = ordersF.join();
4.2 병렬 독립 : thenCombine
둘을 동시에 돌려 결과를 합칠 때
CompletableFuture<Product> pF = fetchProductAsync(id);
CompletableFuture<Review> rF = fetchReviewAsync(id);
CompletableFuture<ProductView> viewF =
pF.thenCombine(rF, (p, r) -> new ProductView(p, r));
ProductView view = viewF.join();
5. 여러 개를 모아 처리 : allOf / anyOf
List<CompletableFuture<Integer>> list = List.of(1,2,3).stream()
.map(n -> CompletableFuture.supplyAsync(() -> n * 10))
.toList();
CompletableFuture<Void> all = CompletableFuture.allOf(list.toArray(new CompletableFuture[0]));
CompletableFuture<List<Integer>> results =
all.thenApply(v -> list.stream().map(CompletableFuture::join).toList());
System.out.println(results.join()); // [10, 20, 30]
- allOf: 모두 끝나면 이어서 진행
- anyOf: 하나라도 끝나면 이어서 진행
6. 예외처리
6.1 실패 시 대체값: exceptionally
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> { throw new RuntimeException("fail"); })
.exceptionally(ex -> "fallback");
System.out.println(cf.join()); // "fallback"
6.2 성공/실패 모두 처리: handle
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> maybeOk())
.handle((val, ex) -> (ex == null) ? val : "fallback");
6.3 관찰/로깅용 훅 : whenComplete
CompletableFuture<String> cf =
CompletableFuture.supplyAsync(() -> call())
.whenComplete((val, ex) -> {
if (ex != null) log.error("실패", ex);
else log.info("성공: {}", val);
});
원칙: 대체값으로
복구: exceptionally/handle,
로깅·정리: whenComplete.
7. 타임아웃/취소 - "기다림" 통제
7.1 N초 넘으면 실패시켜라: orTimeout (Java 9+)
CompletableFuture<String> f =
CompletableFuture.supplyAsync(() -> slowCall())
.orTimeout(500, TimeUnit.MILLISECONDS); // 0.5초 초과 시 TimeoutException
7.2 N초 넘으면 기본값으로 완료: completeOnTimeout
CompletableFuture<String> f =
CompletableFuture.supplyAsync(() -> slowCall())
.completeOnTimeout("cached", 500, TimeUnit.MILLISECONDS);
7.3 취소하기
CompletableFuture<String> f = new CompletableFuture<>();
f.cancel(true); // 이후 join/get은 CancellationException
8. 스레드풀(Executor)을 언제 넘기나?
- 기본은 ForkJoinPool.commonPool() 사용.
- 블로킹 I/O(HTTP, DB, 파일) 또는 무거운 연산이면 전용 풀을 주세요
ExecutorService ioPool = Executors.newFixedThreadPool(16);
CompletableFuture<String> f = CompletableFuture.supplyAsync(this::httpCall, ioPool);
이유: 공용 풀에 블로킹 작업을 던지면 풀 고갈 → 다른 작업이 줄줄이 대기(기아/데드락 유발 위험).
9. 실전 예시 코드
9.1 “두 API 병렬 호출 → 합성 → 폴백”
ExecutorService io = Executors.newFixedThreadPool(32);
CompletableFuture<Product> pF = CompletableFuture.supplyAsync(() -> fetchProduct(id), io);
CompletableFuture<Stock> sF = CompletableFuture.supplyAsync(() -> fetchStock(id), io)
.completeOnTimeout(new Stock(id, 0), 300, TimeUnit.MILLISECONDS)
.exceptionally(ex -> new Stock(id, 0));
CompletableFuture<Price> rF = CompletableFuture.supplyAsync(() -> fetchPrice(id), io);
ProductView view = pF.thenCombine(sF, Pair::of)
.thenCombine(rF, (ps, price) -> new ProductView(ps.getLeft(), ps.getRight(), price))
.orTimeout(800, TimeUnit.MILLISECONDS)
.exceptionally(ex -> fallbackView(id))
.join();
9.2 “순차 의존”
CompletableFuture<User> userF = CompletableFuture.supplyAsync(() -> findUser(userId), io);
CompletableFuture<List<Order>> ordersF = userF.thenCompose(user ->
CompletableFuture.supplyAsync(() -> findOrdersByUser(user), io)
);
List<Order> orders = ordersF.join();
9.3 “N개 태스크 전부 끝나면 집계”
List<CompletableFuture<Integer>> tasks = IntStream.range(0, 10)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> work(i), io))
.toList();
int sum = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))
.thenApply(v -> tasks.stream().mapToInt(CompletableFuture::join).sum())
.join();
10. 자주 하는 실수 체크리스트
- 공용 풀에 블로킹 I/O를 던지지 않았는가? → 전용 Executor 사용
- 체인 중간에서 join/get 하지 않았는가? → 가장 바깥 경계에서만
- 타임아웃/폴백이 필요한 외부 호출에 설정했는가?
- 에러 흐름을 exceptionally/handle로 설계했는가?
- 독립/의존 관계를 구분해 thenCombine/thenCompose를 적절히 선택했는가?
11. 메서드 모음집
- 시작: runAsync(void), supplyAsync(T)
- 변환/소비: thenApply / thenAccept / thenRun (+ …Async)
- 조합: thenCompose(순차), thenCombine·allOf·anyOf(병렬)
- 예외/관찰: exceptionally, handle, whenComplete
- 타임아웃/지연: orTimeout, completeOnTimeout, delayedExecutor
- 수동 완료/취소: complete, completeExceptionally, cancel
반응형
'Java' 카테고리의 다른 글
| [Java] 함수형 프로그래밍의 주요 개념들: 람다, 스트림, 고차 함수, 콜백 함수 등 (3) | 2025.08.11 |
|---|---|
| Java | JAVA의 컴파일 과정 (3) | 2024.12.20 |
| PostgreSQL, Java | Java Map 객체를 저장할 때 발생하는 문제와 해결방법 (1) | 2024.12.20 |
| Java | ExecutorService와 Future를 활용한 병렬 처리 이해 (0) | 2024.12.19 |
| Java | JIT 컴파일러에 대해 (0) | 2024.12.18 |