[Java] 스레드풀 - 블로킹 방식의 작업 완료 통보
블로킹 방식의 작업 완료 통보
ExecutorService의 submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴한다.
리턴 타입 | 메소드명(매개 변수) | 설명 |
Future<?> | submit(Runnable task) | - Runnable 또는 Callable을 작업 큐에 저장 - 리턴된 Future를 통해 작업 처리 결과를 얻음 |
Future<V> | submit(Runnable task, V result) | |
Future<V> | submit(Callable<V> task) |
Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가(지연했다가 = 블로킹 되었다가) 최종 결과를 얻는데 사용된다. 그래서 Future를 지연 완료(pending completion) 객체라고 한다. Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다. 이것이 블로킹을 사용하는 작업 완료 통보 방식이다. 다음은 Future가 가지고 있는 get() 메소드를 설명한 표이다.
리턴 타입 | 메소드명(매개 변수) | 설명 |
V | get() | 작업이 완료될 때까지 블로킹되었다가 처리 결과 V를 리턴 |
V | get(long timeout, TimeUnit unit) | timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 작업이 완료되지 않으면 TimeoutException을 발생시킴 |
리턴 타입인 V는 submit(Runnable task, V result)의 두 번째 매개값인 V 타입이거나 submit(Callable<V> task)의 Callable 타입 파아미터 V 타입이다. 다음은 세 가지 submit() 메소드별로 Future의 get() 메소드가 리턴하는 값이 무엇인지 보여준다.
메소드 | 작업 처리 완료 후 리턴 타입 | 작업 처리 도중 예외 발생 |
submit(Runnable task) | future.get() -> null | future.get() -> 예외 발생 |
submit(Runnable task, Integer result) | future.get() -> int 타입 값 | future.get() -> 예외 발생 |
submit(Callable<String> task) | future.get() -> String 타입 값 | future.get() -> 예외 발생 |
Future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하지 전까지는 get() 메소드가 블로킹되므로 다른 코드를 실행할 수 없다. 만약 UI를 변경하고 이벤트를 처리하는 스레드가 get() 메소드를 호출하면 작업을 완료하기 전까지 UI를 변경할 수도 없고 이벤트를 처리할 수도 없게 된다. 그렇기 때문에 get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드풀의 또 다른 스레드가 되어야 한다.
새로운 스레드를 생성해서 호출 | 스레드풀의 스레드가 호출 |
new Thread(new Runnable() { @Override public void run() { try { future.get(); } catch (Exception e) { e.printStackTrace(); } } }).start(); |
executorService.submit(new Runable() { @Override public void run() { try { future.get(); } catch (Exception e) { e.printStackTrace(); } } }); |
Future 객체는 작업 결과를 얻기 위한 get() 메소드 이외에도 다음과 같은 메소드를 제공한다.
리턴 타입 | 메소드명(매개 변수) | 설명 |
boolean | cancel(boolean mayInterruptIfRunning) | 작업 처리가 진행 중일 경우 취소시킴 |
boolean | isCancelled() | 작업이 취소되었는지 여부 |
boolean | isDone() | 작업 처리가 완료되었는지 여부 |
cancel() 메소드는 작업을 취소하고 싶을 경우 호출할 수 있다. 작업이 시작되기 전이라면 mayInterruptIfRunning 매개값과는 상관없이 작업 취소 후 true를 리턴하지만, 작업이 진행 중이라면 mayInterruptIfRunning 매개값이 true일 경우에만 작업 스레드를 interrupt한다. 작업이 완료되었을 경우 또는 어떤 이유로 인해 취소될 수 없다면 cancel() 메소드는 false를 리턴한다. isCancelled() 메소드는 작업이 완료되기 전에 작업이 취소되었을 경우에만 true를 리턴한다. isDone() 메소드는 작업이 정상적, 예외, 취소 등 어떤 이유에서건 작업이 완료되었다면 true를 리턴한다.
리턴값이 없는 작업 완료 통보
리턴값이 없는 작업일 경우는 Runnable 객체로 생성하면 된다. 다음은 Runnable 객체를 생성하는 방법을 보여준다.
Runnable task = new Runnable() {
@Override
public void run() {
// 스레드가 처리할 작업 내용
}
};
결과값이 없는 작업 처리 요청은 submit(Runnable task) 메소드를 이용하면 된다. 결과값이 없음에도 불구하고 다음과 같이 Future 객체를 리턴하는데, 이것은 스레드가 작업 처리를 정상적으로 완료했는지, 아니면 작업 처리 도중에 예외가 발생했는지 확인하기 위해서이다.
Future future = executorService.submit(task);
작업 처리가 정상적으로 완료되었다면 Future의 get() 메소드는 null을 리턴하지만 스레드가 작업처리 도중 interrupt되면 InterruptedException을 발생시키고, 작업 처리 도중 예외가 발생하면 ExecutionException을 발생시킨다. 그래서 다음과 같은 예외 처리 코드가 필요하다.
try {
future.get();
} catch(InterruptedException e) {
// 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch(ExecutionException e) {
// 작업 처리 도중 예외가 발생된 경우 실행할 코드
}
다음 예제는 리턴값이 없고 단순히 1부터 10까지의 합을 출력하는 작업을 Runnable 객체로 생성하고, 스레드풀의 스레드가 처리하도록 요청한 것이다.
// NoResultExample.java -- 리턴값이 없는 작업 완료 통보
public class NoResultExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Runnable runnable = new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i=1; i<=0; i++) { sum += i; }
System.out.println("[처리 결과] " + sum);
}
};
Future future = executorService.submit(runnable);
try {
future.get();
System.out.println("[작업 처리 완료]");
} catch(Exception e) {
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
리턴 값이 있는 작업 완료 통보
스레드풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성하면 된다. 다음 은 Callable 객체를 생성하는 코드인데, 주의할 점은 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 한다.
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
// 스레드가 처리할 작업 내용
return T;
}
};
Callable 작업의 처리 요청은 Runnable 작업과 마찬가지로 ExecutorService의 submit() 메소드를 호출하면 된다. submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future<T>를 리턴한다. 이때 T는 call() 메소드가 리턴하는 타입이다.
Future<T> future = executorService.submit(task);
스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T 타입의 값을 리턴하면,Future<T>의 get() 메소드는 블로킹이 해제되고 T 타입의 값을 리턴하게 된다.
try {
T result = future.get();
} catch(InterruptedException e) {
// 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch(ExecutionException e) {
// 작업 처리 도중 예외가 발생된 경우 실행할 코드
}
다음 예제는 1부터 10까지의 합을 리턴하는 작업을 Callable 객체로 생성하고, 스레드풀의 스레드가 처리하도록 요청한 것이다.
// ResultByCallableExample.java -- 리턴값이 있는 작업 완료 통보
public class ResultByCallableExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
Callable<Integer> task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i=0; i<=10; i++) {
sum += i;
}
return sum;
}
};
Future<Integer> future = executorService.submit(task);
try {
int sum = future.get();
System.out.println("[처리 결과] " + sum);
System.out.println("[작업 처리 완료]");
} catch(Exception e) {
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
작업 처리 결과를 외부 객체에 저장
상황에 따라서 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 있다. 예를 들어 스레드가 작업 처리를 완료하고 외부 Result 객체에 작업 결과를 저장하면, 애플리케이션이 Result 객체를 사용해서 어떤 작업을 진행할 수 있을 것이다. 대개 Result 객체는 공유 객체가 되어, 두 개 이상의 스레드 작업을 취합할 목적으로 이용된다.
이런 작업을 하기 위해서 ExecutorService와 submit(Runnable task, V result) 메소드를 사용할 수 있는데, V가 바로 Result 타입이 된다. 메소드를 호출하면 즉시 Future<V>가 리턴되는데, Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹되었다가 작업을 완료하면 V 타입 객체를 리턴한다. 리턴된 객체는 submit()의 두 번째 매개값으로 준 객체와 동일한데, 차이점은 스레드 처리 결과가 내부에 저장되어 있다는 것이다.
Result result = ...;
Runnable task = new Task(result);
Future<Result> future = executorService.submit(task, result);
result = future.get();
작업 객체는 Runnable 구현 클래스로 생성하는데, 주의할 점은 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받도록 해야 한다.
class Task implements Runnable {
Result result;
Task(Result result) { this.result = result; }
@Override
public void run() {
// 작업 코드
// 처리 결과를 result 저장
}
}
다음 예제는 1부터 10까지의 합을 계산하는 두 개의 작업을 스레드풀에 처리 요청하고, 각각의 스레드가 작업 처리를 완료한 후 산출된 값을 외부 Result 객체에 누적하도록 했다.
// ResultByRunnableExample.java -- 작업 처리 결과를 외부 객체에 저장
public class ResultByRunnableExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
System.out.println("[작업 처리 요청]");
class Task implements Runnable {
Result result;
Task(Result result) {
this.result = result;
}
@Override
public void run() {
int sum = 0;
for(int i=0; i<=10; i++) {
sum += i;
}
result.addValue(sum);
}
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
result = future2.get();
System.out.println("[처리 결과] " + result.accumValue);
System.out.println("[작업 처리 완료]");
} catch(Exception e) {
e.printStackTrace();
System.out.println("[실행 예외 발생함] " + e.getMessage());
}
executorService.shutdown();
}
}
class Result {
int accumValue;
synchronized void addValue(int value) {
accumValue += value;
}
}
작업 완료 순으로 통보
작업 요청 순서대로 작업 처리가 완료되는 것은 아니다. 작업의 양과 스레드 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생한다. 여러 개의 작업들이 순차적으로 처리될 필요성이 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 된다. 스레드풀에서 작업 처리가 완료된 것만 통보받는 방법이 있는데, CompletionService를 이용하는 것이다. CompletionService는 처리 완료된 작업을 가져오는 poll()과 take() 메소드를 제공한다.
리턴 타입 | 메소드명(매개 변수) | 설명 |
Future<V> | poll() | 완료된 작업의 Future를 가져옴. 완료된 작업이 없다면 즉시 null을 가져옴. |
Future<V> | poll(long timeout, TimeUnit unit) | 완료된 작업의 Future를 가져옴. 완료된 작업이 없다면 timeout까지 블로킹됨. |
Future<V> | take() | 완료된 작업의 Future를 가져옴. 완료된 작업이 없다면 있을 때까지 블로킹됨. |
Future<V> | submit(Callable<V> task) | 스레드풀에 Callable 작업 처리 요청 |
Future<V> | submit(Runnalbe task, V result) | 스레드풀에 Runnable 작업 처리 요청 |
CompletionService 구현 클래스는 ExecutorCompletionService<V>이다. 객체를 생성할 때 생성자 매개값으로 ExecutorService를 제공하면 된다.
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<V> completionService = new ExecutorCompletionService<V>(
executorService
);
poll()과 take() 메소드를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService의 submit() 메소드로 작업 처리 요청을 해야 한다.
completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result);
다음은 take() 메소드를 호출하여 완료된 Callable 작업이 있을 때까지 블로킹되었다가 완료된 작업의 Future를 얻고, get() 메소드로 결과값을 얻어내는 코드이다. while문은 애플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드풀의 스레드에서 실행하는 것이 좋다.
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch(Exception e) {
break;
}
}
}
});
take() 메소드가 리턴하는 완료된 작업은 submit()으로 처리 요청한 작업의 순서가 아님을 명심해야 한다. 작업의 내용에 따라서 먼저 요청한 작업이 나중에 완료될 수도 있기 때문이다. 더 이상 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while문을 종료해야 한다. ExecutorService의 shutdownNow()를 호출하면 take() 에서 InterruptedException이 발생하고 catch 절에서 break가 되어 while문을 종료하게 된다. 다음 예제는 3개의 Callable 작업을 처리 요청하고 처리가 완료되는 순으로 작업의 결과값을 콘솔에 출력하도록 했다.
// CompletionServiceExample.java -- 작업 완료 순으로 통보받기
public class CompletionServiceExample extends Thread {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);
System.out.println("[작업 처리 요청]");
for(int i=0; i<3; i++) {
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for(int i=1; i<=10; i++) {
sum += i;
}
return sum;
}
});
}
System.out.println("[처리 완료된 작업 확인]");
executorService.submit(new Runnalbe() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
System.out.println("[처리 결과] " + value);
} catch(Exception e) {
break;
}
}
}
});
try { Thread.sleep(3000); }
catch(InterruptException e) {}
executorService.shutdownNow();
}
}