- 1. 자바 Concurrent 프로그래밍 소개
- 2. Executors
- 3. Callable과 Future
- 4. CompletableFuture (1)
- 5. CompletableFuture (2)
💡 동시에 여러 작업을 할 수 있는 소프트웨어
- 동시성
- 하나의 쓰레드나 프로세서의 여러 Task를 관리함으로써, 마치 동시에 작업이 수행되고 있는 것처럼 보이는 것
- 병렬성
- Task의 실행을 하드웨어 수준으로 실행되며, 각각의 작업들이 독립적임
- 멀티쓰레드
- 멀티프로세싱(ProcessBuilder)
버전 |
사용 방법 |
---|---|
Java 5 이전 | Runnable과 Thread를 이용하여 구현 |
Java 5 | ExecutorService, Callable, Future |
Java 7 | Fork/Join 그리고 RecursiveTask |
Java 8 | Stream, CompletableFuture |
Java 9 | 분산 비동기 프로그래밍은 명시적으로 지원 (발행 구독 프로토콜 지원 Flow API) |
-
Thread/Runnable
public class App { public static void main(String[] args) { // 첫번째 방법 MyThread myThread = new MyThread(); myThread.start(); // Anonymous class Thread runnable = new Thread(new Runnable() { @Override public void run() { System.out.println("Runnable Thread: " + Thread.currentThread().getName()); } }); // Lambda Expression Thread lambdaThread = new Thread(() -> System.out.println("Lambda Thread:" + Thread.currentThread().getName())); System.out.println("Hello: "+Thread.currentThread().getName()); } // Thread 상속 static class MyThread extends Thread { @Override public void run() { System.out.println("Thread: "+Thread.currentThread().getName()); } } } /* 실행결과 Hello: main Thread: Thread-0 */
- 코드의 실행 순서만 봐서는 Thread가 먼저 출력되야 할 것 같지만, 실제로 실행해보면 다르게 출력될 때도 있다.
- 이를 통해 Thread는 순서를 보장하지 않는다는 것을 알 수 있다.
여기선 로컬클래스를 이용했지만, 익명클래스와 람다표현식을 이용해서도 적용할 수 있다.
-
sleep(mills)
- 현재 쓰레드 재우기(멈춰두기)
- 스레드를 대기상태로 멈춰서 다른 스레드가 처리할 수 있도록 함.
- 하지만 락을 놔주진 않기에 잘못하면 데드락 상태에 걸릴 수 있다.
public static void main(String[] args) throws InterruptedException { // Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(1000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); System.out.println("Hello: "+Thread.currentThread().getName()); }
Thread.sleep(1000L)
- Thread를 start하면 1초(1000L)동안 멈춰있고 그 동안 다른 쓰레드를 수행하기 때문에 Hello가 항상 우선 출력된다.
- 현재 쓰레드 재우기(멈춰두기)
-
interrupt()
- 다른 쓰레드를 깨우기
InterruptException
을 발생- 이 에러에 대한 핸들링은 구현 가능
public static void main(String[] args) throws InterruptedException { // Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(3000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.interrupt(); System.out.println("Hello: "+Thread.currentThread().getName()); }
lambdaThread.interrupt();
- lambdaThread에 interrupt()메소드를 호출해 lambdaThread내에 InterruptedException 을 발생시킨다.
- 다른 쓰레드를 깨우기
-
join()
- 다른 쓰레드가 끝날 때까지 기다린다.
public static void main(String[] args) throws InterruptedException { //Lambda Expression Thread lambdaThread = new Thread(() -> { try { Thread.sleep(3000L); } catch(InterruptedException e){ System.out.println("interrupted!"); return; } System.out.println("Thread: "+Thread.currentThread().getName()); }); lambdaThread.start(); lambdaThread.join(); System.out.println("Hello: "+Thread.currentThread().getName()); }
lambdaThread.join();
- lambdaThread에 join()메소드를 호출하여 lambdaThread가 종료될 때까지 기다린다.
Runnable이나 Thread와 같은 Low-level이 아닌 고 수준(High-Level) Concurrency 프로그래밍
우리가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업들을 대신 해주는 클래스
- 쓰레드 만들기: 애플리케이션이 사용할 쓰레드 풀을 만들어관리한다.
- 쓰레드 관리: 쓰레드 생명 주기를 관리한다.
- 작업 처리 및 실행: 쓰레드로 실행할 작업을 제공할 수 있는 API를 제공한다.
Executor
: execute(Runnable)ExecutorService
: Executor를 상속 받은 인터페이스로, Callable도 실행 가능하며 Executor를 종료 시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공한다.ScheduledExecutorService
: ExecutorService를 상속 받은 인터페이스로 특정 시간 이후에 또는 주기적으로 작업 실행할 수 있다.
public static void main(String[] args) throws InterruptedException {
// ExecutorService 생성
ExecutorService executorService = Executors.newSingleThreadExecutor();
// Legacy case
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("Thread: "+Thread.currentThread().getName());
}
});
// Lambda Expression
executorService.submit(()->{
System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
});
executorService.shutdown(); // graceful shutdown
// executorService.shutdownNow(); //즉시 종료
}
public class App {
private static Runnable getRunnable(String message) {
return () -> System.out.println(message + Thread.currentThread().getName());
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(getRunnable("Hello"));
executorService.submit(getRunnable("World"));
executorService.submit(getRunnable("The"));
executorService.submit(getRunnable("Java"));
executorService.submit(getRunnable("Lecture"));
executorService.submit(getRunnable("Concurrent"));
executorService.submit(getRunnable("Part"));
executorService.shutdown(); //graceful shutdown
}
}
/* 실행결과
Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/
Executors.newFixedThreadPool(2)
- 해당 메소드를 호출하면 해당 영역에는 인자값으로 넘겨준 숫자만큼 Thread를 관리한다.
- 위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 2개의 쓰레드를 관리하는 Thread Pool이 도는 동안 Blocking Queue에 등록된 작업들이 차례대로 동작한다.
출처 : https://catsbi.oopy.io/50d5af24-a724-40a1-981b-a9f00ff521ad#638f51da-ae38-4574-baf5-d4f331ee6fbe
ExecutorService.newSingleThreadScheduledExcutor();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(getRunnable("hello"), 3, 1, TimeUnit.SECONDS);
- scheduleAtFixedRate(실행 Runnable, 시작 지연 시간, 딜레이, 파라미터 시간 단위)
- 위 코드는 Runnable타입을 반환하는 getRunnable() 메소드를 프로그램이 시작 후 3초 뒤부터 1초마다 수행하는 코드
supplyAsync
등의 메소드 호출시 쓰레드 풀을 명시하지 않으면 Java ForkJoinPool 의commonPool()
이 사용된다.- 개발자가 쓰레드 풀을 제어할 수 없다는 것은 나중에 문제가 될 수 있다.
- 따라서, 항상 Java ExecutorService 를 명시적으로 사용하여 쓰레드 풀을 지정하도록 한다.
- Runnable과 거의 유사하지만 반환 값을 가질 수 있다.
- 비동기적인 작업의 현재 상태를 조회하거나 결과를 가져올 수 있다.
- 해당 메소드는
블록킹 콜
이기에 메소드 호출시점부터 코드실행 완료까지 기다린다. - 타임아웃을 설정할 수 있다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");
submit.get();// blocking
System.out.println("End");
executorService.shutdown();
/*
Started!
(2초 뒤)
End
*/
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.get(); // blocking
System.out.println(helloFuture.isDone());
System.out.println("End");
executorService.shutdown();
/* 실행 결과
false
Started!
true
End
*/
- 인자 값으로 현재 진행중인 쓰레드 interrupt 여부를 결정한다.
- true 이면 현재 진행중인 쓰레드를 interrupt하고 그렇지 않으면 현재 진행중인 작업이 끝날때까지 기다린다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");
helloFuture.cancel(false);
System.out.println(helloFuture.isDone());
System.out.println("End");
helloFuture.get();
executorService.shutdown();
/* 실행 결과
false
Started!
true
End
Exception in thread "main" java.util.concurrent.CancellationException...
*/
helloFuture.cancel(false)
- 현재 진행중인 작업을 기다린 뒤 작업을 취소한다.
- 작업이 취소되어 종료되었기 때문에 아래에
helloFuture.isDone()
은 true가 반환되며, 이미 취소한 작업을 get() 호출하는 시점에는CancellationException
예외가 발생
ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () ->{
Thread.sleep(3000L);
return "java";
};
Callable<String> youngjun = () ->{
Thread.sleep(1000L);
return "youngjun";
};
List<Future<String>> futures =
executorService.invokeAll(Arrays.asList(hello, java, youngjun));
for (Future<String> future : futures) {
System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());
/*
Hello
java
youngjun
6
*/
-
invokeAll()
메소드는 태스크가 모두 끝날때까지 기다렸다가 값들을 반환- 싱글 쓰레드이기 때문에 6초가 소요된다.
- future list를 반환하고 전부 끝날때까지 holding된다.
- 넘겨준 Callable list가 정상 처리되든 exception이 발생하든 완료된 것으로 본다.
- 동작중에 전달받은 list가 변경되면 결과를 보장하지 않는다
- 넘겨준 list 순서대로 결과 future를 담아서 넘겨준다.
발생할 수 있는 Exception
- InterruptedException : 동작이 종료되지 않은(동작중인) task가 취소 되었을 때
- NullPointerException : 함수의 param중 null이 있을 때
- RejectedExecutionException : task를 pool에 넣을수 없을 때.
-
따라서 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll을 쓰기에는 성능이 떨어진다.
- 그럴 때 사용 할 수 있는 메소드가
invokeAny
이다.
- 그럴 때 사용 할 수 있는 메소드가
- 동시에 실행한 작업 중에 제일 짧게 걸리는 작업 만큼 시간이 걸린다.
- 블록킹 콜이다.
String result = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println("result = " + result);
/* 실행 결과
result = youngjun
*/
주의할 점은, 싱글 쓰레드로 할 경우 먼저 들어간 순서대로 나오게 된다는 점이다.
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () -> {
Thread.sleep(2000L);
return "Hello";
};
Callable<String> java = () -> {
Thread.sleep(3000L);
return "java";
};
Callable<String> youngjun = () -> {
Thread.sleep(1000L);
return "youngjun";
};
String s = executorService.invokeAny(Arrays.asList(hello, java, youngjun));
System.out.println(s);
executorService.shutdown();
}
/* 실행 결과
Hello
*/
자바에서 비동기(Asynchronous)프로그래밍을 가능하게하는 인터페이스.
Future의 제약사항들을 해결한다.
- 예외 처리용 API를 제공하지 않는다.
- 여러 Future를 조합할 수 없다. (ex: Event정보를 받아 다음 Event에 참석할 회원목록조회)
- Future를 외부에서 완료시킬 수 없다. 취소하거나, get()에 타임아웃을 설정할 수는 있다.
- get()을 호출하기 전까지는 future를 다룰 수 없다.
ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<String> future = executorService.submit(() -> "hello");
...//TODO
future.get();
...//TODO
- 여기서 future는
blocking call
future를 get()으로 가져오는 동안에는 다른 작업들의 수행이 안된다는 의미이고 그 기간이 길어질수록 성능은 떨어질수 밖에 없다.
Future와 CompletionStage를 구현하는 구현체
- 예외처리를 지원하는 메서드
- 순서의 의존관계를 맞는 스레드 프로그래밍
- 콜백을 지원하기도 하며 여러 스레드를 하나로 묶어 처리하기에도 용이
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>{...}
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("youngjun");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("youngjun");
System.out.println("future = " + future.get());
-
리턴값이 없는 경우: runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); }); future.get();
-
리턴값이 있는 경우: supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }); future.get();
-
원하는 Executor(쓰레드풀)를 사용해서 실행할 수도 있다. (기본은 ForkJoinPool.commonPool())
-
thenApply(Function)
리턴값을 받아서 다른 값으로 바꾸는 콜백
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenApply((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); return "HelloAsync"; }); System.out.println(future.get());
- Javascript의 Promise 와 유사한 형태
supplyAsync
의 람다표현식에서 반환된 Hello라는 값은 체이닝된 메소드thenApply
의 인자값으로 들어가고 사용 가능- 그리고 더 이상 체이닝된 메소드가 없기 때문에 return값인 HelloAsync는 반환되어 future로 들어가고 get()을통해 받을 수 있다.
-
thenAccept(Consumer)
리턴값을 받아 또 다른 작업을 수행하는데 반환값은 없는 콜백 (리턴 x)
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "Hello"; }).thenAccept((s)->{ System.out.println("content: "+s); System.out.println(Thread.currentThread().getName()); // return 없다. }); future.get();
-
thenRun(Runnable)
리턴값을 받지 않고 다른 작업을 수행하는 콜백
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { System.out.println("Hello" + Thread.currentThread().getName()); return "hello"; }).thenRun(()->{ System.out.println(Thread.currentThread().getName()); }); future.get();
- from Java 8 in Action
- ParallelStream : I/O가 포함되지 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간다하며 효율적일 수 있다(모든 스레드가 계산 작업을 수행하는 상황에서는 프로세서 코어 수 이상의 쓰레드를 가질 필요가 없다).
- CompletableFuture : 반면 작업이 I/O를 기다리는 작업을 병렬로 실행할 때는
CompletableFuture
가 더 많은 유연성을 제공하며 대기/계산(W/C)의 비율에 적합한 스레드 수를 설정할 수 있다. 특히 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할 지 예측하기 어려운 문제도 있다. - fahd.blog: Java 8: CompletableFuture vs Parallel Stream
- 두 작업이 서로 이어서 실행하도록 조합하며 연관된 future간에 많이 사용
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello " + Thread.currentThread().getName());
return "Hello";
});
CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
System.out.println(future.get());
}
private static CompletableFuture<String> getWorldFuture(String message) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("World " + Thread.currentThread().getName());
return message + " World";
});
}
}
/*
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/
- 두 작업을 독립적으로 실행하고 둘 다 종료 했을때 콜백 실행.
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<String> resultFuture = msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
System.out.println(resultFuture.get());
}
- 여러 작업을 모두 실행하고 모든 작업 결과에 콜백 실행
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);
CompletableFuture<List<String>> results =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
results.get().forEach(System.out::println);
}
- 여러 작업 중 가장 끝난 하나의 결과를 콜백에 넘겨 실행
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
});
CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Apple " + Thread.currentThread().getName());
return "Apple";
});
CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();
boolean throwError = true;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).exceptionally(ex->{
System.out.println(ex);
return "Error";
});
System.out.println(msFuture.get());
boolean throwError = false;
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
if (throwError) {
throw new IllegalArgumentException();
}
System.out.println("MicroSoft " + Thread.currentThread().getName());
return "MicroSoft";
}).handle((result, ex)->{
if (Objects.nonNull(ex)) {
System.out.println(ex);
return "ERROR";
}
return result;
});
System.out.println(msFuture.get());
- 아래는 microservices.io 사이트에서 가져온 예제이다.
- CompletableFuture 를 사용할 만한 부분은 API GATEWAY, Storefront WebApp 라고 할 수 있다. 이유는 Account, Inventory, Shipping Service 는 DATA BASE 와 연동하고 있기 때문이다. java 환경에서 RDBMS 연동시에 아직 Async 한 인터페이스 방식을 제공하고 있지 않기 때문에 Async 개발을 해서 큰 효과를 얻기 부족하다.
- 하지만 GATEWAY 등은 REST Service 와 연동하기 때문에 Async 를 통한 성능 효과를 볼 수 있다. (Blocking Thread 문제 해결 등)
Reference