Skip to content

Latest commit

 

History

History
767 lines (585 loc) · 26.4 KB

park.md

File metadata and controls

767 lines (585 loc) · 26.4 KB

CompletableFuture

1. 자바 Concurrent 프로그래밍 소개

1-1. Concurrent 소프트웨어

💡 동시에 여러 작업을 할 수 있는 소프트웨어

동시성과 병렬성의 차이

  • 동시성
    • 하나의 쓰레드나 프로세서의 여러 Task를 관리함으로써, 마치 동시에 작업이 수행되고 있는 것처럼 보이는 것
  • 병렬성
    • Task의 실행을 하드웨어 수준으로 실행되며, 각각의 작업들이 독립적임

1-1-1. 자바에서 지원하는 Concurrent 프로그래밍

  • 멀티쓰레드
  • 멀티프로세싱(ProcessBuilder)
버전 사용 방법
Java 5 이전 Runnable과 Thread를 이용하여 구현
Java 5 ExecutorService, Callable, Future
Java 7 Fork/Join 그리고 RecursiveTask
Java 8 Stream, CompletableFuture
Java 9 분산 비동기 프로그래밍은 명시적으로 지원 (발행 구독 프로토콜 지원 Flow API)

1-1-2. 자바 멀티쓰레드 프로그래밍

  • Thread/Runnable

    public class App {
        public static void main(String[] args) {
    				// 첫번째 방법
            MyThread myThread = new MyThread();
            myThread.start();
    
            // Anonymous class
            Thread runnable = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("Runnable Thread: " + Thread.currentThread().getName());
                }
            });
    
            // Lambda Expression
            Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName()));
    
            System.out.println("Hello: "+Thread.currentThread().getName());
        }
    
        // Thread 상속
        static class MyThread extends Thread {
            @Override
            public void run() {
                System.out.println("Thread: "+Thread.currentThread().getName());
            }
        }
    }
    
    /* 실행결과
    Hello: main
    Thread: Thread-0
    */
    • 코드의 실행 순서만 봐서는 Thread가 먼저 출력되야 할 것 같지만, 실제로 실행해보면 다르게 출력될 때도 있다.
    • 이를 통해 Thread는 순서를 보장하지 않는다는 것을 알 수 있다.

여기선 로컬클래스를 이용했지만, 익명클래스와 람다표현식을 이용해서도 적용할 수 있다.

1-1-3. 주요기능(method)

  1. sleep(mills)

    • 현재 쓰레드 재우기(멈춰두기)
      • 스레드를 대기상태로 멈춰서 다른 스레드가 처리할 수 있도록 함.
      • 하지만 락을 놔주진 않기에 잘못하면 데드락 상태에 걸릴 수 있다.
    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(1000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    Thread.sleep(1000L)

    • Thread를 start하면 1초(1000L)동안 멈춰있고 그 동안 다른 쓰레드를 수행하기 때문에 Hello가 항상 우선 출력된다.
  2. interrupt()

    • 다른 쓰레드를 깨우기
      • InterruptException 을 발생
      • 이 에러에 대한 핸들링은 구현 가능
    public static void main(String[] args) throws InterruptedException {
        // Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.interrupt();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    lambdaThread.interrupt();

    • lambdaThread에 interrupt()메소드를 호출해 lambdaThread내에 InterruptedException 을 발생시킨다.
  3. join()

    • 다른 쓰레드가 끝날 때까지 기다린다.
    public static void main(String[] args) throws InterruptedException {
        //Lambda Expression
        Thread lambdaThread = new Thread(() -> {
            try {
                Thread.sleep(3000L);
            } catch(InterruptedException e){
                System.out.println("interrupted!");
                return;
            }
            System.out.println("Thread: "+Thread.currentThread().getName());
        });
        lambdaThread.start();
    
        lambdaThread.join();
        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    lambdaThread.join();

    • lambdaThread에 join()메소드를 호출하여 lambdaThread가 종료될 때까지 기다린다.

2. Executors

Runnable이나 Thread와 같은 Low-level이 아닌 고 수준(High-Level) Concurrency 프로그래밍

우리가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업들을 대신 해주는 클래스

2-1. Executors가 하는 일

  • 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어관리한다.
  • 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
  • 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.

2-2. 주요 인터페이스

  • Executor: execute(Runnable)
  • ExecutorService: Executor를 상속 받은 인터페이스로, Callable도 실행 가능하며 Executor를 종료 시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.
  • ScheduledExecutorService: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업 실행할 수 있다.

2-3. 예제

2-3-1. 기본 사용 예제

public static void main(String[] args) throws InterruptedException {
		// ExecutorService 생성
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    // Legacy case
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    });
    // Lambda Expression
    executorService.submit(()->{
        System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
    });

    executorService.shutdown(); // graceful shutdown
    // executorService.shutdownNow(); //즉시 종료
}

2-3-2. 2개의 Thread를 이용하여 실행

public class App {
    private static Runnable getRunnable(String message) {
        return () -> System.out.println(message + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("World"));
        executorService.submit(getRunnable("The"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Lecture"));
        executorService.submit(getRunnable("Concurrent"));
        executorService.submit(getRunnable("Part"));

        executorService.shutdown(); //graceful shutdown
    }
}
/* 실행결과
Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/

Executors.newFixedThreadPool(2)

  • 해당 메소드를 호출하면 해당 영역에는 인자값으로 넘겨준 숫자만큼 Thread를 관리한다.
  • 위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 2개의 쓰레드를 관리하는 Thread Pool이 도는 동안 Blocking Queue에 등록된 작업들이 차례대로 동작한다.

출처 : https://catsbi.oopy.io/50d5af24-a724-40a1-981b-a9f00ff521ad#638f51da-ae38-4574-baf5-d4f331ee6fbe

ExecutorService.newSingleThreadScheduledExcutor();

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
  • scheduleAtFixedRate(실행 Runnable, 시작 지연 시간, 딜레이, 파라미터 시간 단위)
  • 위 코드는 Runnable타입을 반환하는 getRunnable() 메소드를 프로그램이 시작 후 3초 뒤부터 1초마다 수행하는 코드

2-4. Executor 정리

  • supplyAsync 등의 메소드 호출시 쓰레드 풀을 명시하지 않으면 Java ForkJoinPoolcommonPool() 이 사용된다.
  • 개발자가 쓰레드 풀을 제어할 수 없다는 것은 나중에 문제가 될 수 있다.
  • 따라서, 항상 Java ExecutorService 를 명시적으로 사용하여 쓰레드 풀을 지정하도록 한다.

3. Callable과 Future

3-1. Callable

  • Runnable과 거의 유사하지만 반환 값을 가질 수 있다.

3-2. Future

  • 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.

3-3. 메소드 살펴보기

3-3-1. 결과 반환 : get()

  • 해당 메소드는 블록킹 콜이기에 메소드 호출시점부터 코드실행 완료까지 기다린다.
  • 타임아웃을 설정할 수 있다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");

submit.get();// blocking

System.out.println("End");
executorService.shutdown();

/*
Started!
(2초 뒤)
End
*/

3-3-2. 작업 상태 확인 - isDone()

ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.get(); // blocking

System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();

/* 실행 결과
false
Started!
true
End
*/

3-3-3. 작업 취소 : cancel()

  • 인자 값으로 현재 진행중인 쓰레드 interrupt 여부를 결정한다.
  • true 이면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.cancel(false);

System.out.println(helloFuture.isDone());
System.out.println("End");

helloFuture.get();
executorService.shutdown();

/* 실행 결과
false
Started!
true
End
Exception in thread "main" java.util.concurrent.CancellationException...
*/

helloFuture.cancel(false)

  • 현재 진행중인 작업을 기다린 뒤 작업을 취소한다.
  • 작업이 취소되어 종료되었기 때문에 아래에 helloFuture.isDone()은 true가 반환되며, 이미 취소한 작업을 get() 호출하는 시점에는 CancellationException 예외가 발생

3-3-4. 여러 작업 동시 실행 : invokeAll()

ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Callable<String> java = () ->{
    Thread.sleep(3000L);
    return "java";
};
Callable<String> youngjun = () ->{
    Thread.sleep(1000L);
    return "youngjun";
};

List<Future<String>> futures = 
        executorService.invokeAll(Arrays.asList(hello, java, youngjun));
for (Future<String> future : futures) {
    System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());

/*
Hello
java
youngjun
6
*/
  • invokeAll() 메소드는 태스크가 모두 끝날때까지 기다렸다가 값들을 반환

    • 싱글 쓰레드이기 때문에 6초가 소요된다.
    • future list를 반환하고 전부 끝날때까지 holding된다.
    • 넘겨준 Callable list가 정상 처리되든 exception이 발생하든 완료된 것으로 본다.
    • 동작중에 전달받은 list가 변경되면 결과를 보장하지 않는다
    • 넘겨준 list 순서대로 결과 future를 담아서 넘겨준다.

    발생할 수 있는 Exception

    • InterruptedException : 동작이 종료되지 않은(동작중인) task가 취소 되었을 때
    • NullPointerException : 함수의 param중 null이 있을 때
    • RejectedExecutionException : task를 pool에 넣을수 없을 때.
  • 따라서 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll을 쓰기에는 성능이 떨어진다.

    • 그럴 때 사용 할 수 있는 메소드가 invokeAny 이다.

3-3-5. 여러 작업 동시 실행 : invokeAny()

  • 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
  • 블록킹 콜이다.
String result = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println("result = " + result);

/* 실행 결과
result = youngjun
*/

주의할 점은, 싱글 쓰레드로 할 경우 먼저 들어간 순서대로 나오게 된다는 점이다.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    ExecutorService executorService = Executors.newSingleThreadExecutor();

    Callable<String> hello = () -> {
        Thread.sleep(2000L);
        return "Hello";
    };

    Callable<String> java = () -> {
        Thread.sleep(3000L);
        return "java";
    };

    Callable<String> youngjun = () -> {
        Thread.sleep(1000L);
        return "youngjun";
    };

    String s = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
    System.out.println(s);

    executorService.shutdown();
}

/* 실행 결과
Hello
*/

4. CompletableFuture (1)

4-1. 개요

자바에서 비동기(Asynchronous)프로그래밍을 가능하게하는 인터페이스.
Future의 제약사항들을 해결한다.

4-1-1. Future 제약

  • 예외 처리용 API를 제공하지 않는다.
  • 여러 Future를 조합할 수 없다. (ex: Event정보를 받아 다음 Event에 참석할 회원목록조회)
  • Future를 외부에서 완료시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
  • get()을 호출하기 전까지는 future를 다룰 수 없다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
  • 여기서 future는 blocking call

future를 get()으로 가져오는 동안에는 다른 작업들의 수행이 안된다는 의미이고 그 기간이 길어질수록 성능은 떨어질수 밖에 없다.

4-2. CompletableFuture

Future와 CompletionStage를 구현하는 구현체

  • 예외처리를 지원하는 메서드
  • 순서의 의존관계를 맞는 스레드 프로그래밍
  • 콜백을 지원하기도 하며 여러 스레드를 하나로 묶어 처리하기에도 용이

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}

CompletableFuture<String> future = new CompletableFuture<>();
future.complete("youngjun");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("youngjun");
System.out.println("future = " + future.get());

4-2-1. 비동기로 작업 실행하기

  • 리턴값이 없는 경우: runAsync()

    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
    });
    future.get();
  • 리턴값이 있는 경우: supplyAsync()

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    });
    future.get();
  • 원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())

4-2-2. 콜백 제공하기

  • thenApply(Function)

    리턴값을 받아서 다른 값으로 바꾸는 콜백

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    }).thenApply((s)->{
        System.out.println("content: "+s);
        System.out.println(Thread.currentThread().getName());
        return "HelloAsync";
    });
    System.out.println(future.get());
    • Javascript의 Promise 와 유사한 형태
    • supplyAsync의 람다표현식에서 반환된 Hello라는 값은 체이닝된 메소드 thenApply의 인자값으로 들어가고 사용 가능
    • 그리고 더 이상 체이닝된 메소드가 없기 때문에 return값인 HelloAsync는 반환되어 future로 들어가고 get()을통해 받을 수 있다.
  • thenAccept(Consumer)

    리턴값을 받아 또 다른 작업을 수행하는데 반환값은 없는 콜백 (리턴 x)

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "Hello";
    }).thenAccept((s)->{
        System.out.println("content: "+s);
        System.out.println(Thread.currentThread().getName());
            // return 없다.
    });
    future.get();
  • thenRun(Runnable)

    리턴값을 받지 않고 다른 작업을 수행하는 콜백

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello" + Thread.currentThread().getName());
        return "hello";
    }).thenRun(()->{
        System.out.println(Thread.currentThread().getName());
    });
    future.get();

ParallelStream vs. CompletableFuture

  • from Java 8 in Action
  • ParallelStream : I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간다하며 효율적일 수 있다(모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 쓰레드를 가질 필요가 없다).
  • CompletableFuture : 반면 작업이 I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며 대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할 지 예측하기 어려운 문제도 있다.
  • fahd.blog: Java 8: CompletableFuture vs Parallel Stream

5. CompletableFuture (2)

5-1. CompletableFuture 조합 메소드

5-1-1. thenCompose()

  • 두 작업이 서로 이어서 실행하도록 조합하며 연관된 future간에 많이 사용
public class App {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
        System.out.println(future.get());

    }
    private static CompletableFuture<String> getWorldFuture(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}

/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/

5-1-2. thenCombine()

  • 두 작업을 독립적으로 실행하고 둘 다 종료 했을때 콜백 실행.
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });

    CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
    
    System.out.println(resultFuture.get());
}

5-1-3. allOf()

  • 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });
    List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);

    CompletableFuture<List<String>> results =
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                    .thenApply(v -> futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
    results.get().forEach(System.out::println);
}

5-1-4. anyOf()

  • 여러 작업 중 가장 끝난 하나의 결과를 콜백에 넘겨 실행
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
});

CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Apple " + Thread.currentThread().getName());
    return "Apple";
});

CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();

5-2. 예외 처리 메소드

5-2-1. exeptionally(Function)

boolean throwError = true;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).exceptionally(ex->{
    System.out.println(ex);
    return "Error";
});

System.out.println(msFuture.get());

5-2-2. handle(BiFunction)

boolean throwError = false;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).handle((result, ex)->{
    if (Objects.nonNull(ex)) {
        System.out.println(ex);
        return "ERROR";
    }
    return result;
});

System.out.println(msFuture.get());

5-3. CompletableFuture 를 언제 쓸 수 있을까

  • 아래는 microservices.io 사이트에서 가져온 예제이다.

image

  • CompletableFuture 를 사용할 만한 부분은 API GATEWAY, Storefront WebApp 라고 할 수 있다. 이유는 Account, Inventory, Shipping Service 는 DATA BASE 와 연동하고 있기 때문이다. java 환경에서 RDBMS 연동시에 아직 Async 한 인터페이스 방식을 제공하고 있지 않기 때문에 Async 개발을 해서 큰 효과를 얻기 부족하다.
  • 하지만 GATEWAY 등은 REST Service 와 연동하기 때문에 Async 를 통한 성능 효과를 볼 수 있다. (Blocking Thread 문제 해결 등)

Reference