JAVA/비동기 처리

JAVA CompletableFuture의 비동기 처리

수달하나 2023. 10. 12. 01:00

멀티코어 프로세서가 발전하면서 애플리케이션의 속도는 멀티 코어 프로세서를 얼마나 잘 활용할 수 있도록 소프트웨어를 개발하는가에 따라 달라질 수 있다. 여러가시 서비스를 동시에 제공하는 마이크로서비스 아키텍처의 선택이 지난 몇 년간 증가한것을 통해 이러한 추세를 확인할 수 있다.

 

위와 같은 서비스를 제공함에 있어서 동시간 여러 서비스의 응답을 기다리는 동안 연산이 블록되거나 귀중한 CPU 클록 사이클 자원을 낭비하고 싶지 않다면 동시성을 필요로 하는 상황 즉 조금씩 연관된 작업을 같은 CPU에서 동작하는 것 또는 애플리케이션의 생산성을 극대화 할 수 있도록 코어를 바쁘게 유지하는 것이 목표라면 원격 서비스나 데이터베이스 결과를 기다리는 스레드를 블록함으로 연산 자원을 낭비하는 일 등을 피해야만 한다.

 

위와 같은 환경을 구성하기 위해서는 크게 두 가지를 통해 해결 할 수 있는데 첫 번째는 Future 인터페이스를 사용하는 것이고 두 번째는 리액티브 프로그래밍의 개념을 따르는 flow API를 사용하는 것 이다.


동시성과 병렬성

동시성과 병렬성은 다른 의미다.

동시성은 단일 코어 머신에서 발생할 수 있는 프로그래밍 속성으로 실행이 서로 겹칠 수 있는 것을 의미한다. 하나의 코어에서 여러개의 작업이 번갈아가며 실행되는 것을 의미한다. 반면 병렬성은 여러 코어를 통해 작업들이 실행되는 것을 의미함으로 하나의 코어가 아닌 멀티코어 환경을 의미한다.


자바 동시성 구현의 발전

기존의 자바는 Runnable 과 Thread 를 통해서 동기화된 클래스와 메서드를 이용해 블록함으로써 동시성을 구현 하였다.

더 나아가 Executor 프레임워크와 스레드 풀을 통해서 스레드의 힘을 높은 수준으로 끌어올려 자바 프로그래머가 태스크 제출과 실행을 분리 할 수 있는 기능까지 제공했다.

더 나아가 ExecutorService 의 newFixedThreadPool을 통해 스레드 풀을 관리 할 수 있도록 했다. 프로그래머가 태스크, Runnable 혹은 Callable 을 제공하면 스레드가 이를 실행한다. 이러한 형태의 스레드 관리는 main이 종료되기 전 모든 스레드의 작업이 끝나길 기다리기 때문에 스레드 풀을 종료하는 습관을 갖는 것이 중요하다.


엄격한 포크/조인 과 여유로운 포크/조인

엄격한 포크/조인은 내스크나 스레드가 메서드 호출 안에서 시작되면 그 메서드 호출은 반환하지 않고 작업이 끝나기를 기다린다. 하지만 여유로운 포크/조인은 시작된 태스크를 내부 호출이 아니라 외부 호출에서 종료하도록 기다리는 좀 더 여유로운 방식의 사용법을 제안한다. 여유로운 포크/조인의 경우 사용자의 메서드 호출에 의해 스레드가 생성되지만 생성된 스레드를 벗어나 계속 실행되는 동시성 형태에 초점을 둔다. 

메서드 호출자에 기능을 제공하도록 메서드가 반환된 후에도 만들어진 태스크 실행이 계속되는 메서드를 비동기 메서드라 한다. 

  1. 메서드 실행
  2. 스레드 생성
  3. 메서드 종료

비동기 메서드는 메서드가 종료되는 시점에서도 스레드는 계속 실행이 되고 있다.

위와 같은 여유로운 포크/조인 사용법은 메서드를 호출한 다음의 코드와 동시에 실행되므로 데이터 경쟁 문제를 일으키지 않도록 주의해야 하고 기존 실행 중이던 스레드가 종료되지 않은 상황에서 main이 종료될 경우 어떤 방식으로 처리를 할 것인지에 대해 생각해야 한다. 

 

  • 데몬 스레드 : 애플리케이션이 종료될 때 강제 종료
  • 비데몬 스레드 : 애플리케이션이 종료될 때 비 종료

main 메서드는 모든 비 데몬 스레드가 종료될 때까지 프로그램을 종료하지 않고 기다린다.


CompletableFuture 의 다양한 메서드

자바 8에서 부터는 비동기 처리를 좀 더 효율적으로 할 수 있도록 CompletatbleFuture 클래스를 제공해준다. 기존 Future의 한계점을 보안하여 외부에서의 완료 처리, 블로킹 코드 이후 결과 처리와 여러 작업을 조합하여 처리하는 것 과 같은 다양한 기능들을 제공하고 있다.

 

CompletablaFuture 이 제공하는 다양한 메서드는 아래와 같다. 

  • runAsync : 반환값이 없는 비동기 작업
  • supplyAsync : 반환값이 있는 비동기 작업
CompletableFuture<Void> runAsyncResult = CompletableFuture.runAsync(()->{
    System.out.println("runAsync is execute");
});

CompletableFuture<String> supplyAsyncResult = CompletableFuture.supplyAsync(
        ()-> "supplyAsync is execute");
  • thenApply : 비동기 처리 후 값을 받아 다른 값으로 변환하여 반환 
  • thenAccept : 비동기 처리 후 값을 받아 새로운 작업을 실행
  • thenRun : 비동기 처리 후 값을 무시하고 새로운 작업을 실행 
CompletableFuture<String> thenApplyResult = getFuture()
        .thenApply(value -> value + " : thenApply execute");

CompletableFuture<Void> thenAcceptResult = getFuture().thenAccept(value -> {
    System.out.println(value + " : thenAccept execute");
});

CompletableFuture<Void> thenRunResult = getFuture().thenRun(() -> {
    System.out.println("thenRun execute");
});

위에서 말한 메서드는 단순히 하나의 작업을 처리하고 그 이후 값을 받아 처리하던지 혹은 다른 작업을 처리하는 것을 마지막으로 주기가 끝이 난다. 하지만 아래와 같은 메서드들은 비동기 처리를 조합하는 방식의 새로운 형태의 기능을 제공한다.

  • thenCompose : 하나의 비동기 작업이 끝나면 결과 값을 받아 두 번째 비동기 작업을 실행하고 값을 반환
  • thenCombine : 두 개의 비동기 작업이 동시에 실행되며 두 비동기 작업의 결과값을 처리하고 값을 반환
CompletableFuture<String> thenComposeResult =
        getFuture().thenCompose(value -> CompletableFuture.supplyAsync(() -> value + ": thenCompose is return"));

CompletableFuture<String> thenCombineResult =
        getFuture().thenCombine(CompletableFuture.supplyAsync(() -> "thenCombine is return"),
                (value1, value2) -> value1 + " : " + value2);
  • allOf : 여러 비동기 작업을 동시에 실행하고 모든 작업 결과를 반환
CompletableFuture<Void> allOfResult =
        CompletableFuture.allOf(getHello(), getWorld());

allOf 메서드의 경우 병렬적으로 태스크를 던지지만 결과 값을 반환받아 처리 할 수 없다. 결과 값을 반환받아 처리하고 싶다면 List를 통해 추가 작업을 진행해야 한다.

List<CompletableFuture<String>> list = List.of(getHello(), getWorld());
CompletableFuture<List<String>> allOfResultReturn =
        CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]))
                .thenApply(value -> list
                        .stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList()));

하지만 위와 같은 CompletableFuture::join 의 메서드 참조는 비동기 처리를 위한 작업에 있어서 내부적으로 block을 호출하도록 한다. 따라서 아래와 같은 방식으로 변경하여 호출하는 것이 더 바람직한 방법이다.

List<Object> list = new ArrayList<>();
CompletableFuture<Void> allOfResultReturn =
        CompletableFuture.allOf(getHello(), getWorld())
                .thenAccept(list::add);

이후에 allOfResultReturn 의 isDone 메서드를 호출하여 모든 비동기 작업이 완료되었을 경우 list를 이용하여 결과 값을 확인 할 수 있다.

  • anyOf : 여러 비동기 작업을 동시에 실행하고 가장 빨리 끝난 작업의 결과를 반환
CompletableFuture<Object> anyOfResult =
        CompletableFuture.anyOf(getHello(), getWorld(), getNumber());