Back-end/이것이 자바다[신용권 한빛미디어]

스레드 풀

Ho's log 2021. 12. 12. 21:26

병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케줄링으로 인해 CPU가 바빠져 메모리 사용량이 늘어난다.

따라서 애플리케이션의 성능저하 된다.

 

갑작스런 병렬 작업의 폭증으로 인한 스레드의 폭증을 막으려면 스레드풀(ThreadPool)을 사용해야 한다.

스레드풀은 작업처리에 사용되는 스레드를 제한된 개수만큼 정해놓고

작업 큐 (Queue)에 들어노는 작업들을 하나씩 스레드가 맡아 처리한다

 

작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리한다.

그렇기 때문에 작업 처리 요청이 폭증되어도

스레드의 전체 개수가 늘어나지 않으므로 애플리케이션의 성능이 급격히 저하 되지 않는다.

 

자바는 스레드풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서

ExecutorService 인터페이스와 Executors 클래스를 제공하고 있다.

Executors의 다양한 정적 메소드를 이용해서 ExecutorService 구현 객체를 만들수 있는데

이것이 바로 스레드 풀이다.

 

다음 그림은 ExecutorService가 동작하는 방식을 보여준다 

 

 

스레드풀 생성 및 종료 


스레드풀 생성

ExecutorService 구현 객체는 Executors 클래스의 다음 두 가지 메소드 중 하나를 이용해서 간편하게 생성 

 

메소드명(매개) 초기 스레드 수  코어 스레드 수  최대 스레드 수
newCachedThreadPool() 0 0 Integer.MAX_VALUE
newFixedThreadPool(int nThread) 0 nThreads nThreads

 

초기 스레드 수은 ExecutorService 객체가 생성될 때 기본적으로 생성되는 스레드 수를 말하고,

코어 스레드 수는 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드풀에서 제거할 때 

최대 스레드 수는 스레드풀에서 관리하는 최대 스레드 수이다.

newCachedThreadPool() 메소드로 생성된 스레드풀의 특징은 초기 스레드 개수와 코어 스레드 개수는 0개이고

스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시켜 작업을 처리한다.

이론적으로 int 값이 가질 수 있는 최대값만큼 스레드가 추가되지만, 

운영체제의 성능과 상황에 따라 달라진다.

1개 이상의 스레드가 추가 되었을 경우 60초 동안 추가된 스레드가 아무 작업을 하지 않으면 추가된 스레드를 종료하고 풀에서 제거 한다.

다음은 newCachedThreadPool()을 호출해서 ExecutorService 구현 객체를 얻는 코드이다 

ExecutorService executorService = Executors.newCachedThreadPool();

 

newFixedThreadPool(int nThreads) 메소드로 생성된 스레드풀의 초기 스레드 개수는 0개이고,

코어 스레드 수는 nThreads이다.

스레드 개수보다 작업 개수가 많으면 새 스레드를 생성시키고 작업을 처리한다.

최대 스레드 개수는 매개값으로 준 nThreads 이다.

이 스레드풀은 스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄지 않는다.

 

다음은 CPU 코어의 수만큼 최대 스레드를 사용하는 스레드풀을 생성 한다.

ExecutorService executroeService = Executors.newFixedThreadPool(Runtime.getRuntime().availaleProcessors());;

 

newCachetThreadPool() 과 newFixedThreadPool() 메소드를 사용하지 않고 코어 스레드 개수와 최대 스레드 개수를 설정하고 싶다면 직접 ThreadPoolExecutor 객체를 생성하면 된다.

사실 위 두가지 메소드도 내부적으로 ThreadPoolExecutor 객체를 생성해서 리턴한다.

 

다음은 초기 스레드 개수가 0개, 코어 스레드 개수가 3개, 최대 스레드 개수가 100개인 스레드풀을 생성한다.

그리고 코어 스레드 3개를 제외한 나머지 추가된 스레드가 120초 동안 놀고있을 경우 해당 스레드를 제거해서 스레드 수를 관리한다.

ExecutorService threadPool = new ThreadPoolExecutor(3,100,120L,TimeUnit.SECONDS, new SychronousQueue<Runnalbe>)

코어스레드 개수, 최대스레드 개수, 놀고있는 시간, 놀고있는 시간 단위, 작업큐 순 파라미터

 

스레드풀 종료

스레드풀의 스레드는 기본적으로 데몬 스레드가 아니기 때문에

main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아있다. 

그래서 main() 메소드가 실행이 끝나도 애플리케이션 프로세는 종료되지 않는다. 

애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해주어야 한다

ExecutorService 는 종료와 관련해서 다음 세 개의 메소드를 제공하고 있다.

 

리턴 타입 메소드명(매개 변수) 설명
void shutdown() 현재 처리 중인 작업 뿐만 아니라 작업 큐에 대기하고 있는
모든 작업을 처리한 뒤에 스레드풀을 종료시킨다.
List<Runnable> shutdownNow() 현재 작업 처리 중인 스레드를 interrupt 해서 작업 중지를 시도하고 스레드 풀을 종료 시킨다.
리턴값을 작업 큐에 있는 미처리된 작업(Runnable)의 목록이다
boolean awaitTermination(long timeout, TimeUnit unit) shutdown()메소드 호출 이후 모든 작업처리를 timeout 시간 내에 완료하면 true를 리턴하고, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴한다 

 

남아있는 작업을 마무리하고 스레드풀을 종료할 때에는 shutdown()을 일반적으로 호출하고, 남아있는 작업과는 상관없이 강제로 종료할 때에는 shutdownNow()를 호출한다.

executorService.shutdown();
executorService.shutdownNow();

 

 

 

작업 생성과 처리 요청


작업 생성

하나의 작업은 Runnnable 또는 Callalbe 구현 클래스로 표현한다.

Runnable과 Callable 의 차이점은 작업 처리 완료 후 리턴값이 있느냐 없느냐 이다.

다음은 작업을 정의하기 위해 Runnable과 Callable 구현 클래스를 작성하는 방법을 보여준다

Runnable 구현 클래스 Callable 구현 클래스
Runnable task = new Runnable() {

    @Override
    public void run(){
     // 스레드가 처리할 작업 내용
    }


}
Callable<T> task = new Callable<T>(){
    @Override
    public T call() Throws Exception{

     // 스레드가 처리할 작업 내용
     return T
    }




}

 

Runnable의 run() 메소드는 리턴값이 없고. Callable의 call() 메소드는 리턴값이 있다.

call()의 리턴 타입은 implements Callable<T>에서 지정한 T 타입이다. 

스레드 풀의 스레드는 작업 큐에서 Runnable 또는 Callable 객체를 가져와 run()과 call() 메소드를 실행한다.

 

 

작업 처리 요청 

작업 처리 요청이란 ExecutorService 의 작업 큐에 Runnalbe 또는 Callable 객체를 넣는 행위를 말한다.

ExecutorService 는 작업 처리 요청을 위해 다음 두가지 종류의 메소드를 제공한다. 

 

리턴 타입  메소드명(매개 변수) 설명
void execute(Runnable command) - Runnable을 작업 큐에 저장
- 작업 처리 결과를 받지 못함 
Future<?>
Future<V>
Future<V>
submit(Runnable task)
submit(Runnable task, V result)
submit(Callable<V> task)
- Runable 또는 Callable 을 작업 큐에 저장
- 리턴된 Future을 통해 작업 처리 결과를 얻을 수 있음

 

execute() 와 submit() 메소드의 차이점은 두 가지이다. 

하나는 execute()는 작업 처리 결과를 받지 못하고 submit() 은 작업 처리 결과를 받을 수 있도록 Future를 리턴한다.

또 다른 차이점은 execute()는 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드풀에서 제거 된다

따라서 스레드풀은 다른 작업 처리를 위해 새로운 스레드를 생성한다.

반면에 submit()은 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용된다.

그렇게 가급적이면 스레드의 생성 오버헤더를 줄이기 위해서 submit() 을 사용하는것이 좋다.

 

 

 

Runnalbe 작업을 정의 할때 Integet.parseInt("삼")을 넣어 NumberFormatException 이 발생 하도록 유도.

10개의 작업을 execute() 와 submit() 메소드로 각각 처리 요청 했을 경우 스레드풀의 상태를 살펴보기

 

public class ExecuteExample{
	public static void main(String[] args) throws Exception{
    
       	ExecutoreService executoreService = Executores.newFixedThreadPool(2);
        // 최대개수가 2인 스레드 풀 생성
        
        for (int i = 0 ; i<10; i++){
        	Runnable runnable - new Runnable(){
            
            @Override
            public void run(){
            //스레드 총 개수 및 작업 스레드 이름 출력
            ThreadPollExecutore threadPoolExecutor = (ThreadPoolExecutor) execuorService;
            
            int poolsize = threadPoolExecutor.getPoolsize();
            String threadName - Thread.currentThread().getName();
            System.out.println("[ 총 스레드 개수 :  " + poolSize + "] 작업 스레드 이름 : " + threadName );
            
            //예외 발생 시킴
            int value = Integer.parseInt("삼");
            
            
            
            }
            
            };
            
            executorService.execute(runnable);
            //executorService.submit(runnable);
            
            Thread.sleep(10); // 콘솔에 출력시간을 주기 위해 0.01 알시 정지 시킴
        
        }
    
    	executorService.shutdown();
    
    }



}

 

- execute 일때

스레드풀의 스레드 최대 개수 2는 변함이 없지만,

실행 스레드의 이름을 보면 모두 다른 스레드가 작업을 처리하고 있다

이것은 작업 처리 도중 예외가 발생했기 때문에 해당 스레드는 제거되고 새 스레드가 계속 생성되기 때문이다 .

 

-submit 일때 

실행 결과를 보면 execut()와의 차이점을 발견할 수 있다.

예외가 발생하더라도 스레드가 종료되지 않고 계속 재사용되어 다른 작업을 처리하고 았는 것을 볼수있다.

 

 

블로킹 방식의 작업 완료 통보


ExcutetorService 의 submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드 풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴한다 .

 

리턴 타입 메소드명(매개변수) 설명
Future<?> submit(Runnable task) - Runnable 또는 Callable 을 작업 큐에 저장
- 리턴된 Future를 통해 작업 처리 결과를 얻음
Future<V> submit(Runnable task, V result)
Future<V> submit(Callable<V> task)

 

Future 객체는 작업 결과가 아니라 작업이 완료 될때까지 기다렸다가(지연했다가 = 블로킹되었다가) 최종 결과를 얻는 데 사용된다.

그래서 Future를 지연완료(pending completion) 객체라고 한다.

Future의 get() 메소드를 호출하면 스레드가 작업을 완료할 때 까지 블로킹되었다가 작업을 완료하면 처리 결과를 리턴한다.

이것이 블로킹을 사용하는 작업 완료 통보 방식이다 다음은 Future가 가지고 있는get() 메소드를 설명한 표이다 

 

리턴 타입 메소드명(매개변수) 설명
V get() 작업이 완료될 때까지 블로킹 되었다가 처리 결과 V를 리턴
V get(long timeout, TimeUnit unit) timeout 시간 전에 작업이 완료되면 결과 V를 리턴하지만, 
작업이 완료되지 않으면 TimeoutExceptiondmf 발생 시킴 

 

리턴 타입인 V는 submit(Runnable task, V result)의 두 번째 매개값인 V타입이거나

submit(Callable <V> task)의 Callable 타입 파라미터 V 타입이다. 

다음은 세가지 submit() 메소드 별로 Future 의 get() 메소드가 리턴하는 값이 무엇인지 보여준다.

 

메소드 작업 처리 완료 후 리턴 타입 작업 처리 도중 예외 발생 
submit(Runnalbe task) future.get() -> null future.get() -> 예외 발생
submit(Runnalbe task, Integer result) future.get() -> int 타입 값 future.get() -> 예외 발생
submit(Callable<String> task) future.get() -> String 타입 값 future.get() -> 예외 발생 

 

 

Future를 이용한 블로킹 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료

하기 전까지는

get() 메소드가 블로킹되므로 다른 코드를 실행 할 수 없다.

만약 UI를 변경하고 이벤트를 처리하는 스레드가 get() 메소드를 호출하면 작업을 완료하기 전까지 UI를 변경할 수도 없고 이벤트를 처리 할 수도 없게 된다.

그렇기 때문에 get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드풀의 또 다른 스레드가 되어야 한다.

새로운 스레드를 생성해서 호출 스레드풀의 스레드가 호출
new Thread(new Runnable()) {
    @Override
     public void run(){
        try{
          future.get();
       
         } catch (Exception e){

           e.printStackTrace();
        }
     }



}).start();
excutetorService.submit(new Runable(){
    @Override
     public void run() {

        try{
          future.get();
         } catch (Exception e){
            e.printStackTrace();
          }

      }


});

 

Future 객체는 작업 결과를 얻기 위한 get() 메소드 이외에도 다음과 같은 메소드를 제공한다.

리턴 타입 메소드명(매개변수) 설명
boolean cancel(boolean mayInterruptIfRunning) 작업 처리가 진행 중일 경우 취소 시킴
boolean isCancelled() 작업이 취소 되었는지 여부
boolean isDone() 작업 처리가 완료되었는지 여부 

 

cancel() 메소드는 작업을 취소하고 싶을 경우 호출 할 수 있다.

작업이 시작되기 전이라면 MayInterruptIfRunning 매개값과는 상관없이 작업 취소 후 true를 리턴하지만,

작업이 진행 중 이라면 mayInterruptIfRunning 매개값이 true 일 경우에만 작업 스레드를 interrupt 한다

작업이 완료 되었을 경우 또는 어떤 이유로 인해 취소 될수 없다면 cancel() 메소드는 false 를 리턴한다.

 

isCancelled() 메소드는 작업이 완료되기 전에 작업이 취소되었을 경우에만 true를 리턴한다.

isDone() 메소드는 작업이 정상적, 예외, 취소 등 어떤 이유에서건 작업이 완료 되었다면 true를 리턴한다. 

 

리턴 값이 없는 작업 완료 통보

리턴값이 없는 작업일 경우는 Runnable 객체로 생성하면 된다. 

다음은 Runnable 객체를 생성하는 방법을 보여준다

Runnable task = new Runnbale(){
	@Override
    public void run(){
    	//스레드가 처리할 작업 내용 
    }
    
    


}

 

결과값이 없는 작업 처리 요청은 submit(Runnable task) 메소드를 이용하면 된다.

결과값이 없음에도 불구하고 다음과 같이 Future 객체를 리턴하는데,

이것은 스레드가 작업 처리를 정상적으로 완료했는지, 

아니면 작업 처리 도중에 예외가 발생했는지 확인하기 위해서이다.

 

Future future = executorService.submit(task);

 

작업 처리가 정상적으로 완료 되었다면 Future의 get() 메소드는 null을 리턴하지만 

스레드가 작업 처리 도중 inturrupt 되면 InterruptException 을 발생시키고, 작업 처리 도중 예외가 발생하면

ExecutionException을 발생시킨다. 

 

 

그래서 다음과 같은 예외 처리 코드가 필요하다

try{
	future.get();
} catch(InterruptedExceptuon e){
	//작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch (Exception e ){
	//작업 처리 도중 예외가 발생된 경우 실행할 코드 
}

 

 

리턴값이 없고 단순히 1부터 10까지의 합을 출력하는 작업을 Runnalbe 객체로 생성 하고 스레드 풀의 스레드가 처리하도록 요청 

public class NoResultExample {

	public static void main(String[] args){
    	ExecutorService excutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        System.out.println("작업 처리 요청");
        
        Runnable runnable = new Runaable() {
        
        	@Override
            public void run(){
            
            	int sum = 0 ;
                for (int i = 1 ; i <= 10; i ++){
                 sum += 1;
   
                }
            	System.out.println("처리결과" + sum);
            }
        
        };
        
        Future future = executorService.submit(runnable);
        
        try{
        	future.get();
            System.out.println("작업처리 완료")
        } catch (Exception e){
        	System.out.println("[실행 예외 발생함]" + e.getMessage );
        }
    
    	excutetor.shudown()
    }




};

 

리턴값이 있는 작업 완료 통보

스레드풀의 스레드가 작업을 완료한 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable 으로 생성

 

다음은 Callable 객체를 생성하는 코드, 주의할 점은 제네릭 타입 파리미터 T 는 call() 메소드가 리턴하는 타입

 

Callable<T> task = new Callable<T>(){
	
    @Override
    public T call() throws Exception{
    	//스레드가 처리할 작업 내용
        return T
    }


};

 

Callable 작업의 처리 요청은 Runnable 작업과 마찬가지로 ExcutetorService의 submit() 메소드를 호출하면 된다.

submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future<T>를 리턴한다. 

이때 T 는 call() 메서드가 리턴하는 타입이다.

Future<T> future = executorService.submit(task);

 

스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T타입의 값을 리턴하면,

Future<T>의 get() 메소드는 블로킹이 해제되고 T 타입의 값을 리턴하게 된다.

try{

	T result = future.get();
} catch (InterruptedException e){
	// 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드


} catch(ExecutionException e){
	// 작업 처리 도충 예외가 발생된 경우 실행할 코드 
}

 

1부터 10까지 합을 리턴하는 작업을 Callable 객체로 생성하고 스레드풀의 스레드가 처리하도록 요청한것

public class ResultByCallableExample{
	public static void main(String[] args){
    
    	ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
		System.out.println("작업 처리 요청")
        
        Callable<Integer> task = new Callable<Integer>() {
        	@Override
            public Integer call() throws Exception{
            
            	int sum = 0;
                for(int i = 1; i < 10; i ++){
                
                	sum += 1;
                }
            	return sum;
            }
        
        };
        Future<Integer> future = executorService.submit(task);
    	
        
        try {
         int sum = future.get();
         System.out.println("처리결과 " + sum);
         System.out.println("작업처리 완료")
        } catch (Exception e)
        {
        	System.out.println("실행 예외 발생")
        }
    	
        executorService.shutdown();
    }
    



}

 

작업 처리 결과를 외부 객체에 저장

상황에 따라서 스레드가 작업한 결과를 외부 객체에 저장해야 할 경우도 있다.

예를 들어 스레드 작업처리를 완료하고 외부 Result 객체에 작업 결과를 저장하면, 

애플리케이션이 Result객체를 사용해서 어떤 작업을 진행 할 수 있을 것이다. 

대개 Result 객체는 공유 객체가 되어, 두개 이상의 스레드 작업을 취합할 목적으로 이용된다.

 

이런 작업을 하기 위해서 ExecutorService의 submit(Runnable task, V Result) 메소드를 사용 할 수 있는데,

V가 바로 Result 의 타입이 된다. 메소드를 호출하면 즉시 Future<V>가 리턴되는데,

Future의 get() 메소드를 호출하면 스레드가 작업을 완료할때 까지 블로킹 되었다가 작업을 완료하면 V타입 객체를 리턴한다.

리턴한다 리턴된 객체는 submit()의 두번째 매개값으로 준 객체와 동일한데, 

차이점은 스레드 처리결과 내부에 저장되어 있다는 것이다.

 

Result result = ...;

Runnable task = new Task(result);

Future<Result> future = executetorService.submit(task, result);

result = future.get();

 

작업 객체는 Runnable 구현 클래스로 생성하는데,

주의할 점은 스레드에서 결과를 저장하기 위해 외부 Result 객체를 사용해야 하므로 생성자를 통해 Result 객체를 주입받고록 해야한다.

class Task implements Runnalbe {
	Result result;
    Task(Result result){this.result = result };
    
    @Override
    public void run(){
    
    	// 작업 코드
        // 처리 결과를 result 저장 
    }


}

 

1부터 10까지의 합을 계산하는 두 개의 작업을 스레드 풀에 처리 요청하고, 각각의 스레드가 작업 처리를 완료한 후 산출된 값을 외부 Result 객체에 누적하도록 했다

 

public class ResultByRunnableExample{
	public static void main(String[] args ){
    	ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        System.out.println("작업 처리 요청");
        
        class Task implements Runnable {
        	Result result;
            
            Task(Result result){
            	this.result = result;
            }
            
            @Override 
            public void run(){
            	int sum = 0;
                for(int i = 1; i <= 10; i++0){
                	sum+= i
                }
                result.addValue(sum);
            }
        
        
        }
        
        Result result = new Result();
        Runnable task1 = new Task(result);
        Runnable task2 = new Task(result);
        
        Future<Result> future1 = executorService.submit(task1, result);
        Future<Result> future2 = executorService.submit(task2, result);
        
        try {
        	result = future1.get();
            result = future2.get();
            
        } catch (Exception e){
        	e.printStackTrace();
            System.out.println("실행 예외" + e.getMessage());
        }
        
        executorService.shutdown();
    
    
    
    }



}

class Result {

	int accumValue;
    synchronized void addValue(int value){
    
    	accumValue += value;
    }

}

 

작업 완료 순으로 통보

작업 요청 순서대로 작업 처리가 완료 되는 것은 아니다.

작업의 양과 스레드 스케줄링에 따라서 먼저 요청한 작업이 나중에 완료 되는 경우도 발생 한다.

여러 개의 작업들이 순차적으로 처리될 필요성이 없고, 

처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 된다.

스레드풀에서 작업 처리가 완료된 것만 통보받는 방법이 있는데

Complentionservice를 이용하는 것이다.

CompletionService는 처리 완료된 작업을 가져오는 poll() take() 메소드를 제공 한다 

 

리턴 타입 메소드명(매개 변수) 설명
Future<V> poll() 완료된 작업의 Future 를 가져옴,
완료된 작업이 없다면 즉시 null을 리턴함
Future<V> poll(long timeout, TimeUnit unit) 완료된 작업의 Futuref를 가져옴,
완료된 작업이 없다면 timeout 까지 블로킹됨
Future<V> take() 완료된 작업의 Future를 가져옴
완료된 작업이 없다면 있을 때 까지 블로킹됨
Future<V> submit(Callable<V> task) 스레드풀에 Callable 작업 처리 요청
Future<V> submit(Runnable task, V result) 스레드풀에 Runnable 작업 처리 요청

 

CompletionService 구현 클래스는 ExecutorCompletionService<V>이다.  객체를 생성할 때 생성자 매개값으로 ExecutorService를 제공하면 된다.

 

ExecutorService executorService = Executors.newFixedThreadPool(
	Runtime.getRuntime().availableProcessors()
);

CompletionService<V> completionService = new ExecutorCompletionService<V>(

	executorService
);

 

poll() 과 take() 메소드를 이용해서 처리 완료된 작업의 Future를 얻으려면 CompletionService의 submit() 메소드로 작업 처리를 요청을 해야 한다.

completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result)

 

다음은 take() 메소드를 호출하여 완료된 Callable 작업이 있을 때 까지 블로킹 되었다가 완료된 작업의 Future를 얻고 ,

get() 메소드로 결과값을 얻어내는 코드이다. while 뭉은 애플리케이션이 종료될 때까지 반복 실행해야 하므로 스레드풀의 스레드에서 실행 하는 것이 좋다.

 

executorService.submit(new Runnable() {  // -> 스레드 풀의 스레드에서 실행하도록 함
	@Override
    public void run() {
    	while(true) {
        
        	try {
            	Future<Integer> future = completionService.take(); // -> 완료된 작업이 있을 때 까지 블로킹 / 완료된 작업이 잇으면 Future를 리턴
                int value = future.get(); // get()은 블로킹 되지 않고 바로 작업 결과를 리턴 
                System.out.println("[처리결과] " + value);
                
            
            } catch (Exception e){
            
            	break;
            }
        
        }
    }

})

take() 메소드가 리턴하는 완료된 작업은 submit()으로 처리 요청한 작업의 순서가 아님을 명시해야 한다.

작업의 내용에 따라서 먼저 요청한 작업이 나중에 완료 될 수도 있기 때문이다. 

더 이상 완료된 작업을 가져올 필요가 없다면 take() 블로킹에서 빠져나와 while 문을 종료해야 한다.

ExecutorService의 shutdownNow()를 호출하면 take() 에서 InterruptedException 이 발생하고 catch 절에서 break가

되어 while 뭉을 종료하게 된다.

 

다음 예제의 3개의 Callable작업을 처리 요청하고 처리가 완료되는 순으로 작업의 결과값을 콘솔에 출력하도록 했다.

// 작업 완료 순으로 통보 받기 

public class CompletionServiceExample extends Thread {
	public static void main(Stringp[ args){
    	
        ExecutoreService executorService = Executors.newFixedThreadPool(
        	Runtime.getRuntime().availableProcessors()
        );
        
        // CompletionService 생성
        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executorService);
  		
        
        System.out.println("[작업 처리 요청]");
        
        for(int i = 0; i <3; i++){
        	completionService.submit(new Callable<Integer>){
            
            	@Override
                public Integer call() throws Excepiton{
            	
                	int sum  = 0;
                    for(int i = 1 ; i <= 10; i++){
                    	sum += i;
                    }
                    return sum;
            }
        
        
        }
    
    	System.out.println("[처리 완료된 작업 확인]");
        executorService.submit(new Runnable() { // 스레드풀의 스레드에서 실행하도록 함
        	@Override
            public void run(){
            	while(true){
                	try {
                    	Future<Integer> future = completionService.take(); // 완료된 작업 가져오기
                        int value = future.get();
                        System.out.println("[처리 결과 ]" + value);
                        
                    
                    
                    } catch (Exception e){
                    	break;
                    }
                
                }
            
            }
        
        
        
        });
        
        try{Thread.sleep(3000);}
    	catch (InterruptedException e){
        	executorService.shutdownNow();
        }
    
    }




}

 

콜백 방식의 작업 완료 통보

이번에는 콜백(Callback) 방식을 이용해서 작업 완료 통보를 받는 방법에 대해서 알아보자. 

콜백이란 애플리케이션이 스레드에게 작업 처리를  요청한 후,

스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법을 말한다.

이때 자동 실행되는 메소드를 콜백 메소드라고 한다.

다음은 블로킹방식과 콜백 방식을 비교한 그림이다.

 

 

블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹되지만,

콜백 방식은 작업 처리를 요청한 후 결과를 기다릴 필요 없이 다른 기능을 수행할 수 있다.

그이유는 작업 처리가 완료되면 자동적으로 콜백 메소드가 실행되어 결과를 알 수 있기 때문이다.

 

아쉽게도 ExecutorService는 콜백을 위한 별도의 기능을 제공하지 않는다. 

하지만 Runnable 구현 클래스를 작성할 때 콜백 기능을 구현 할수 있다.

먼저 콜백 메소드를 가진 클래스가 있어야 하는데, 직접 정의해도 좋고

java.nio.channels.CompletionHandler를 이용해도 좋다.

이 인터페이스는 NIO패키지에 포함되어 있는데 비동기 통신에서 콜백 객체를 만들 때 사용된다.

그럼 CompletionHandler를 이용해서 콜백 객체를 만드는 방법을 살펴보자.

 

다음은 CompletionHandler 객체를 생성하는 코드이다.

 

CompletionHandler<V, A> callback = new CompletionHandler<V , A> () {
	@Override
    public void completed(V result, A attachment){
    }
	
    @Override
    public void failed(Throwable exc, A attachment){
    }

}

 

CompletionHandler 는 completed()와 failed()메소드가 있는데, completed()는 작업을 정상 처리 완료했을 때 

호출되는 콜백 메소드이고, failed()는 작업 처리 도중 예외가 발생했을 때 호출 되는 콜백 메소드 이다.

CompletionHandler의 V타입 파라미터는 결과값의 타입이고, A는 첨부값의 타입이다,

첨부값은 콜백 메소드에 결과값 이외에 추가적으로 전달하는 객체라고 생각하면 된다.

만약 첨부값이 필요 없다면 A는 Void 로 지정해주면 된다. 

다음은 작업 처리 결과에 따라 콜백 메소드를 호출하는 Runnable 객체이다.

Runnable task = new Runnable(){

	@Override
    public void run(){
    
    	try{
        
        	// 작업 처리
            V result = ..;
            callback.completed(result, null); --> 작업을 정상 처리했을 경우 호출
        } catch (Exception e){
        	callback.failed(e, null); --> 예외가  발생했을 경우 호출
        }
    
    
    }


}

 

 

작업 처리가 정상적으로 완료되면 completed() 콜백 메소드를 호출해서 결과값을 전달하고, 

예외가 발생하면 failed() 콜백 메소드를 호출해서 예외 객체를 전달한다.

 

 

다음은 두개의 문자열을 정수화 해서 더하는 작업을 처리하고 결과를 콜백 방식으로 통보한다.

 

첫 번째 작업은 "3", "3"을 주었고 두번째 작업은  "3", "삼"을 주었다.

첫번째 작업은 정상적으로 처리 되기 때무에 completed()가 자동으로 호출되고,

두번째 작업은 NumberFormatException이 발생되어 failed() 메소드가 호출된다.

 

public class Callback Example{

	private ExecutoreService executorService;
    
    public CallbackExample(){
    	executorService = Executors.newFixedThreadPool(
        	Runtime.getRunTime().availableProcesssors();
        )
    	
    }
    
    private CompletionHandler<Integer, Void> callback = new CompletionHandler<Integer, Void>(){
    	@Override
        public void completed(Integer result, Void attacmnet){
        	System.out.println("completed() 실행 : " + result);
            
            
        }
        
        @Override
        public void failed(Throwable exc, Void attachment){
        	System.out.println("failed() 실행: " + exc.toString());
        
        }
    
    
    };
    
    pulbic void doWork(final String x, final String y){
    	Runnable task = new Runnabble() {
        	@Override
            public void run(){
            	try{
                	int intX = Integer.parseInt(x);
                    int intY = Integer.parseInt(y);
                    
                    int result = intX + intY;
                    callback.completed(result, null); // 정상 처리했을 경우 호출
                    
                
                } catch(NumberFormatException e){
                	callback.failed(e, null); // 예외가 발생햇을 경우 호출
                }
            
            }
        
        
        };
        excutorService.submit(task); // 스레드풀에게 작업 처리 요청  
    
    }

	public void finish(){
    	executorService.shutdown(); // 스레드풀 종료
    
    }
    
    public static void main(String[] args){
    
    	CallbackExample example = new CallbackExample();
        example.doWork("3", "3");
        example.doWork("3", "삼");
        example.finish();
    }
}

 

 

 

'Back-end > 이것이 자바다[신용권 한빛미디어]' 카테고리의 다른 글

람다식  (0) 2022.04.06
[제네릭]  (0) 2022.02.12
스레드 그룹(ThreadGroup)  (0) 2021.12.12
스레드 상태 제어  (0) 2021.12.05
스레드 상태  (0) 2021.12.04