Java/Concurrency

Java > Concurrency > 10. Thread pool, Executor framework

Krevis 2024. 10. 14. 08:16

스레드 직접 사용할 때의 문제점

1. 스레드 생성 비용으로 인한 성능 문제

  • 메모리 할당
    • 각 스레드는 자신만의 호출 스택을 가진다
    • 보통 1MB 이상의 메모리를 사용한다
  • OS 자원 사용
    • 스레드 생성은 OS 커널 수준에서 이루어지며, 시스템 호출을 통해 처리된다
  • OS 스케줄러 설정
    • OS 스케줄러가 스레드를 관리하고 실행 순서를 조정한다

 

스레드를 생성하는 작업은 상대적으로 무겁다. 아주 가벼운 작업이라면 작업의 실행 시간보다 생성 시간이 더 오래 걸릴 수도 있다

 

이러한 문제는 스레드 재사용을 통해 해결할 수 있다

2. 스레드 관리 문제

서버의 CPU, 메모리 자원은 한정되어 있기에 스레드는 무한히 만들 수 없다

 

사용자의 요청을 처리하는 스레드가 평소에는 100개면 충분했는데, 10,000명의 사용자가 몰려 10,000개의 스레드를 생성해야 한다면 서버는 자원 고갈로 정상적으로 동작할 수 없을 것이다

 

이 문제를 해결하기 위해 최대 스레드 개수를 정해야 한다

 

또 안전한 종료를 위해 우아한 종료를 지원해야 한다거나 급하게 종료해야 해서 인터럽트를 보내야 한다면 스레드가 관리 대상이어야 한다

3. Runnable 인터페이스의 불편함

  • 반환형이 void이다
    • 스레드의 실행 결과를 직접 받을 수 없다
      • 스레드가 실행한 결과를 멤버 변수에 담아두고, join() 메서드 등을 호출해 스레드 종료를 기다리고 그 후에 Getter 호출하는 식으로 해야 함
  • 예외 처리
    • run() 메서드는 검사 예외를 던질 수 없다
      • 예외 처리를 메서드 내부에서 처리해야 한다

위 문제들의 해결법

1, 2번 문제를 해결하기 위해서는 Pool이 필요하다

 

스레드 풀에 있는 스레드는 처리할 작업이 없으면 WAITING, 있으면 RUNNABLE 상태로 변경해야 한다. 클라이언트는 생상자가 되고 스레드 풀은 소비자가 될 것이다

 

이를 직접 구현하려면 매우 복잡하다. 하지만 Executor 프레임워크가 이런 문제들을 해결해준다

 

 

Executor

인터페이스

java.util.concurrent.Executor

  • void execute(Runnable command);

java.util.concurrent.ExecutorService

  • 서비스 종료
    • void shutdown();
      • 더이상 새 작업을 받지 않는다
        • 새 작업을 요청하면 RejectedExecutionException 발생
      • 이미 제출된 작업을 모두 완료 후 종료
        • ThreadPool과 BlockingQueue에 있는 모든 작업
      • 스레드 풀의 자원을 정리한다
      • None blocking
    • List<Runnable> shutdownNow();
      • 실행 중인 작업을 중단하기 위해 인터럽트 발생
      • 대기 작업들을 반환하며 즉시 종료
        • BlockingQueue에 있는 작업을 List<Runnable> 타입으로 반환
        • 스레드 풀의 스레드에 인터럽트 발생
      • 스레드 풀의 자원을 정리한다
      • None blocking
    • void close()
      • 자바 19부터 지원
      • 우아한 종료 구현
        • shutdown() 메서드를 호출하고 하루가 지나도 작업이 완료되지 않으면 shutdownNow() 메서드 호출
        • 보통 30~60초 정도의 시간이 적당하기에, 직접 구현하자
      • 호출한 스레드에 인터럽트가 발생해도 shutdownNow() 메서드 호출
  • 서비스 상태 확인
    • boolean isShutdown();
      • 서비스 종료 확인
    • boolean isTerminated();
      • shutdown 관련 메서드 호출 후 모든 작업이 완료되었는지 확인
  • 작업 완료 대기
    • boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
      • 서비스 종료 시 모든 작업 완료될 때까지 지정된 시간만큼 대기
      • 시간 안에 작업이 모두 종료되면 true, 그렇지 않으면 false 반환
      • Blocking
  • 작업 제출
    • 작업을 스레드 풀에 제출하여, 풀 내에 실행 가능한 스레드가 있을 경우 즉시 실행 
    • <T> Future<T> submit(Callable<T> task);
    • <T> Future<T> submit(Runnable task, T result);
    • Future<?> submit(Runnable task);
  • 작업 컬렉션 제출
    • 작업 컬렉션 처리. 여러 작업을 한 번에 처리 가능
    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
      • 작업 컬렉션을 제출하고 모든 작업이 끝날 때까지 기다린다
    • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
      • 위와 동일. 지정된 시간 내에 수행
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
      • 여러 작업 중 가장 먼저 완료된 작업의 결과 반환
      • 완료되지 않은 나머지 작업은 취소된다
    • <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

구현체

java.util.concurrent.ThreadPoolExecutor

구성 요소

  • 스레드 풀
    • 스레드 관리
  • Blocking queue
    • 작업 관리
    • 생산자-소비자 문제 해결을 위해 대기 큐 사용

 

실제로 스레드 풀에서 스레드를 꺼내고 반납하지는 않고 상태를 RUNNABLE, WAITING 상태로 바꾸는 것이다

 

 

https://github.com/venzersiz/learn-java8/tree/master/src/test/java/concurrency/executor

 

 

 

스레드 풀 관리

여기서는 ThreadPoolExecutor 기준으로 설명

 

생성자의 매개변수

  • int corePoolSize
    • 기본 스레드 수
  • int maximumPoolSize
    • 최대 스레드 수
  • long keepAliveTime, TimeUnit unit
    • 기본 스레드 수를 초과해 만들어진 스레드가 생존할 수 있는 대기시간
    • 이 시간이 지나면 초과 스레드는 제거된다
  • BlockingQueue<Runnable> workQueue
    • 작업을 보관할 대기 큐

 

로직

  1. 작업을 요청하면 corePoolSize만큼 스레드를 만든다
  2. corePoolSize를 넘는 수의 작업이 요청되면 작업 큐에 작업을 넣고 대기시킨다
  3. workQueue의 크기를 넘는 수의 작업이 요청되면 maximumPoolSize만큼 임시로 사용되는 스레드를 더 만든다
    • 큐가 가득차서 큐에 넣을 수 없으므로 큐에 있는 것보다 먼저 수행된다?
  4. maximumPoolSize를 넘는 수의 작업이 요청되면 더이상 받아들일 수 없으므로 RejectedExecutionException이 발생한다

 

 

스레드 미리 생성하기

응답시간이 중요한 서버라면 요청을 처음 받기 전에 스레드를 풀에 미리 생성해두고 싶을 수 있다

 

ThreadPoolExecutor 클래스의 prestartAllCoreThreads() 메서드를 사용하면 된다

 

 

스레드 풀 생성 전략

Executors 클래스를 통해 다음 전략을 제공한다

  • newSingleThreadExecutor()
    • 스레드 1개 사용
    • 큐 크기에 제한이 없다 (LinkedBlockingQueue)
    • 간단히 사용하거나 테스트 용도
    • new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
  • newFixedThreadPool(int nThreads)
    • new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    • 정해진 스레드 수만큼만 사용
    • 큐 크기에 제한이 없다
    • 스레드 수가 고정되어 CPU, 메모리 자원을 예측 가능한 안정적인 방식
    • 서버 자원은 여유가 있는데, 사용자만 점전 느려지는 문제가 발생할 수 있다
  • newCachedThreadPool()
    • new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
    • 기본 스레드를 사용하지 않음
    • 1분의 생존 주기를 가진 초과 스레드만 사용
    • 큐에 작업을 저장하지 않는다 (SynchronousQueue)
      • 생산자의 요청을 스레드 풀의 소비자 스레드가 바로 처리
    • 모든 요청이 큐에서 대기하지 않고 바로 처리하여 빠른 처리 가능
    • 초과 스레드의 수에 제한이 없기 때문에 CPU, 메모리 자원만 허용하면 시스템  자원을 최대로 사용할 수 있다
    • 매우 빠르고 유연한 전략
    • 1분간 생존하기 때문에 작업 수에 맞추어 적절한 수의 스레드가 재사용된다
      • 요청이 갑작스러운 증감에 스레드도 대응한다
    • 서버 자원을 최대로 활용하지만, 서버가 감당할 수 있는 임계점을 넘는 순간 시스템이 다운될 수 있다

사용자 정의 스레드 풀 생성 전략

세분화된 전략이 필요하다

  • 보통 상황
    • CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드 풀로 서비스를 안정적으로 운영
  • 긴급 상황
    • 사용자 요청이 갑자기 증가하면 긴급하게 스레드를 추가 투입해 작업을 빠르게 처리
  • 요청 거절
    • 사용자 요청이 픅증하여 긴급 대응도 어렵다면 요청을 거부한다

어떤 경우에도 시스템이 다운되는 최악의 상황은 피해야 한다

 

 

스레드 풀 예외 정책

소비자가 처리할 수 없을 정도로 생산 요청이 가득차면 어떻게 할지를 결정해야 한다

개발자가 인지할 수 있는 로그를 남기고 사용자에게 시스템에 문제가 있음을 알려줘야 한다

 

ThreadPoolExecutor가 작업을 거부하는 정책

  • AboryPolicy
    • 기본
    • 새 작업 제출 시 RejectedExecutionException 발생
  • DiscardPolicy
    • 새 작업을 버린다
  • CallerRunsPolicy
    • 새 작업을 제출한 스레드가 대신 작업 실행
    • 생산자가 소비자의 일도 하기 때문에 생산 자체가 느려진다
      • 생산 속도가 너무 빠를 때 생산 속도를 조절할 수 있다
  • 사용자 정의
    • RejectedExecutionHandler 인터페이스 직접 구현

 

shutdown 후 요청을 거부할 때도 같은 정책이 적용된다

 

 

Callable

interface Callable<V>

  • V call() throws Exception;

 

Generic V 타입을 반환할 수 있고, 모든 예외를 던질 수 있다

 

 

Future

미래의 결과를 받을 수 있는 객체

스레드 풀의 스레드는 미래의 어느 시점에 실행된다(혹은 바로 실행될 수도 있다)

 

ExecutorService와 Future를 사용하면 마치 싱글 스레드를 사용하는 것처럼 프로그래밍할 수 있다

 

내부에 작업의 완료 여부/결과값을 가진다

 

요청 스레드가 future의 get() 메서드를 호출하면 Future가 완료 상태가 될 때까지 대기한다. 요청 스레드의 상태는 WAITING이 된다

 

스레드가 어떤 결과를 얻기 위해 대기하는 것을 Blocking이라고 한다

  • Thread.join()
  • Future.get()

 

ExecutorService의 처리 절차

  1. 요청 스레드가 작업 스레드에게 작업을 요청하고 WAITING 상태로 바뀌어 대기한다
  2. 작업 스레드는 작업을 완료한 후 Future에 반환값을 담는다
  3. 작업 스레드는 Future의 상태값을 완료로 변경한다
  4. 작업 스레드는 요청 스레드를 깨운다
  5. 요청 스레드는 RUNNABLE 상태가 된다

 

메서드

  • boolean cancel(boolean mayInterruptIfRunning);
    • 완료되지 않은 작업 취소
    • 취소 상태로 변경한다
    • mayInterruptIfRunning
      • true
        • 작업이 실행 중이면 Thread.interrupt() 메서드를 호출하여 작업 중단 
      • false
        • 작업이 실행 중이면 중단하지 않음
    • 반환값
      • true
        • 작업이 취소됨
      • false
        • 이미 완료되었거나 취소할 수 없는 경우
  • boolean isCancelled();
  • boolean isDone();
    • 작업 성공 여부와 무관하게 작업이 끝났는지 확인
  • V get() throws InterruptedException, ExecutionException;
    • 작업이 완료될 때까지 대기하고, 완료되면 결과 반환
    • 취소 상태라면 CancellationException 런타임 예외 발생
    • 예외
      • ExecutionException
        • 작업 중에 예외가 발생한 경우
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    • 위와 동일
    • 주어진 시간 내에 작업이 완료되지 않으면 TimeoutException 발생 
  • Future.State state()
    • 자바 19부터 지원