스레드 직접 사용할 때의 문제점
1. 스레드 생성 비용으로 인한 성능 문제
- 메모리 할당
- 각 스레드는 자신만의 호출 스택을 가진다
- 보통 1MB 이상의 메모리를 사용한다
- OS 자원 사용
- 스레드 생성은 OS 커널 수준에서 이루어지며, 시스템 호출을 통해 처리된다
- OS 스케줄러 설정
- OS 스케줄러가 스레드를 관리하고 실행 순서를 조정한다
스레드를 생성하는 작업은 상대적으로 무겁다. 아주 가벼운 작업이라면 작업의 실행 시간보다 생성 시간이 더 오래 걸릴 수도 있다
이러한 문제는 스레드 재사용을 통해 해결할 수 있다
2. 스레드 관리 문제
서버의 CPU, 메모리 자원은 한정되어 있기에 스레드는 무한히 만들 수 없다
사용자의 요청을 처리하는 스레드가 평소에는 100개면 충분했는데, 10,000명의 사용자가 몰려 10,000개의 스레드를 생성해야 한다면 서버는 자원 고갈로 정상적으로 동작할 수 없을 것이다
이 문제를 해결하기 위해 최대 스레드 개수를 정해야 한다
또 안전한 종료를 위해 우아한 종료를 지원해야 한다거나 급하게 종료해야 해서 인터럽트를 보내야 한다면 스레드가 관리 대상이어야 한다
3. Runnable 인터페이스의 불편함
- 반환형이 void이다
- 스레드의 실행 결과를 직접 받을 수 없다
- 스레드가 실행한 결과를 멤버 변수에 담아두고, join() 메서드 등을 호출해 스레드 종료를 기다리고 그 후에 Getter 호출하는 식으로 해야 함
- 스레드의 실행 결과를 직접 받을 수 없다
- 예외 처리
- run() 메서드는 검사 예외를 던질 수 없다
- 예외 처리를 메서드 내부에서 처리해야 한다
- run() 메서드는 검사 예외를 던질 수 없다
위 문제들의 해결법
1, 2번 문제를 해결하기 위해서는 Pool이 필요하다
스레드 풀에 있는 스레드는 처리할 작업이 없으면 WAITING, 있으면 RUNNABLE 상태로 변경해야 한다. 클라이언트는 생상자가 되고 스레드 풀은 소비자가 될 것이다
이를 직접 구현하려면 매우 복잡하다. 하지만 Executor 프레임워크가 이런 문제들을 해결해준다
Executor
인터페이스
java.util.concurrent.Executor
- void execute(Runnable command);
java.util.concurrent.ExecutorService
- 서비스 종료
- void shutdown();
- 더이상 새 작업을 받지 않는다
- 새 작업을 요청하면 RejectedExecutionException 발생
- 이미 제출된 작업을 모두 완료 후 종료
- ThreadPool과 BlockingQueue에 있는 모든 작업
- 스레드 풀의 자원을 정리한다
- None blocking
- 더이상 새 작업을 받지 않는다
- List<Runnable> shutdownNow();
- 실행 중인 작업을 중단하기 위해 인터럽트 발생
- 대기 작업들을 반환하며 즉시 종료
- BlockingQueue에 있는 작업을 List<Runnable> 타입으로 반환
- 스레드 풀의 스레드에 인터럽트 발생
- 스레드 풀의 자원을 정리한다
- None blocking
- void close()
- 자바 19부터 지원
- 우아한 종료 구현
- shutdown() 메서드를 호출하고 하루가 지나도 작업이 완료되지 않으면 shutdownNow() 메서드 호출
- 보통 30~60초 정도의 시간이 적당하기에, 직접 구현하자
- 호출한 스레드에 인터럽트가 발생해도 shutdownNow() 메서드 호출
- void shutdown();
- 서비스 상태 확인
- boolean isShutdown();
- 서비스 종료 확인
- boolean isTerminated();
- shutdown 관련 메서드 호출 후 모든 작업이 완료되었는지 확인
- boolean isShutdown();
- 작업 완료 대기
- boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
- 서비스 종료 시 모든 작업 완료될 때까지 지정된 시간만큼 대기
- 시간 안에 작업이 모두 종료되면 true, 그렇지 않으면 false 반환
- Blocking
- boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
- 작업 제출
- 작업을 스레드 풀에 제출하여, 풀 내에 실행 가능한 스레드가 있을 경우 즉시 실행
- <T> Future<T> submit(Callable<T> task);
- <T> Future<T> submit(Runnable task, T result);
- Future<?> submit(Runnable task);
- 작업 컬렉션 제출
- 작업 컬렉션 처리. 여러 작업을 한 번에 처리 가능
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
- 작업 컬렉션을 제출하고 모든 작업이 끝날 때까지 기다린다
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
- 위와 동일. 지정된 시간 내에 수행
- <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
- 여러 작업 중 가장 먼저 완료된 작업의 결과 반환
- 완료되지 않은 나머지 작업은 취소된다
- <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
구현체
java.util.concurrent.ThreadPoolExecutor
구성 요소
- 스레드 풀
- 스레드 관리
- Blocking queue
- 작업 관리
- 생산자-소비자 문제 해결을 위해 대기 큐 사용
실제로 스레드 풀에서 스레드를 꺼내고 반납하지는 않고 상태를 RUNNABLE, WAITING 상태로 바꾸는 것이다
https://github.com/venzersiz/learn-java8/tree/master/src/test/java/concurrency/executor
스레드 풀 관리
여기서는 ThreadPoolExecutor 기준으로 설명
생성자의 매개변수
- int corePoolSize
- 기본 스레드 수
- int maximumPoolSize
- 최대 스레드 수
- long keepAliveTime, TimeUnit unit
- 기본 스레드 수를 초과해 만들어진 스레드가 생존할 수 있는 대기시간
- 이 시간이 지나면 초과 스레드는 제거된다
- BlockingQueue<Runnable> workQueue
- 작업을 보관할 대기 큐
로직
- 작업을 요청하면 corePoolSize만큼 스레드를 만든다
- corePoolSize를 넘는 수의 작업이 요청되면 작업 큐에 작업을 넣고 대기시킨다
- workQueue의 크기를 넘는 수의 작업이 요청되면 maximumPoolSize만큼 임시로 사용되는 스레드를 더 만든다
- 큐가 가득차서 큐에 넣을 수 없으므로 큐에 있는 것보다 먼저 수행된다?
- maximumPoolSize를 넘는 수의 작업이 요청되면 더이상 받아들일 수 없으므로 RejectedExecutionException이 발생한다
스레드 미리 생성하기
응답시간이 중요한 서버라면 요청을 처음 받기 전에 스레드를 풀에 미리 생성해두고 싶을 수 있다
ThreadPoolExecutor 클래스의 prestartAllCoreThreads() 메서드를 사용하면 된다
스레드 풀 생성 전략
Executors 클래스를 통해 다음 전략을 제공한다
- newSingleThreadExecutor()
- 스레드 1개 사용
- 큐 크기에 제한이 없다 (LinkedBlockingQueue)
- 간단히 사용하거나 테스트 용도
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- newFixedThreadPool(int nThreads)
- new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
- 정해진 스레드 수만큼만 사용
- 큐 크기에 제한이 없다
- 스레드 수가 고정되어 CPU, 메모리 자원을 예측 가능한 안정적인 방식
- 서버 자원은 여유가 있는데, 사용자만 점전 느려지는 문제가 발생할 수 있다
- newCachedThreadPool()
- new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
- 기본 스레드를 사용하지 않음
- 1분의 생존 주기를 가진 초과 스레드만 사용
- 큐에 작업을 저장하지 않는다 (SynchronousQueue)
- 생산자의 요청을 스레드 풀의 소비자 스레드가 바로 처리
- 모든 요청이 큐에서 대기하지 않고 바로 처리하여 빠른 처리 가능
- 초과 스레드의 수에 제한이 없기 때문에 CPU, 메모리 자원만 허용하면 시스템 자원을 최대로 사용할 수 있다
- 매우 빠르고 유연한 전략
- 1분간 생존하기 때문에 작업 수에 맞추어 적절한 수의 스레드가 재사용된다
- 요청이 갑작스러운 증감에 스레드도 대응한다
- 서버 자원을 최대로 활용하지만, 서버가 감당할 수 있는 임계점을 넘는 순간 시스템이 다운될 수 있다
사용자 정의 스레드 풀 생성 전략
세분화된 전략이 필요하다
- 보통 상황
- CPU, 메모리 자원을 예측할 수 있도록 고정 크기의 스레드 풀로 서비스를 안정적으로 운영
- 긴급 상황
- 사용자 요청이 갑자기 증가하면 긴급하게 스레드를 추가 투입해 작업을 빠르게 처리
- 요청 거절
- 사용자 요청이 픅증하여 긴급 대응도 어렵다면 요청을 거부한다
어떤 경우에도 시스템이 다운되는 최악의 상황은 피해야 한다
스레드 풀 예외 정책
소비자가 처리할 수 없을 정도로 생산 요청이 가득차면 어떻게 할지를 결정해야 한다
개발자가 인지할 수 있는 로그를 남기고 사용자에게 시스템에 문제가 있음을 알려줘야 한다
ThreadPoolExecutor가 작업을 거부하는 정책
- AboryPolicy
- 기본
- 새 작업 제출 시 RejectedExecutionException 발생
- DiscardPolicy
- 새 작업을 버린다
- CallerRunsPolicy
- 새 작업을 제출한 스레드가 대신 작업 실행
- 생산자가 소비자의 일도 하기 때문에 생산 자체가 느려진다
- 생산 속도가 너무 빠를 때 생산 속도를 조절할 수 있다
- 사용자 정의
- RejectedExecutionHandler 인터페이스 직접 구현
shutdown 후 요청을 거부할 때도 같은 정책이 적용된다
Callable
interface Callable<V>
- V call() throws Exception;
Generic V 타입을 반환할 수 있고, 모든 예외를 던질 수 있다
Future
미래의 결과를 받을 수 있는 객체
스레드 풀의 스레드는 미래의 어느 시점에 실행된다(혹은 바로 실행될 수도 있다)
ExecutorService와 Future를 사용하면 마치 싱글 스레드를 사용하는 것처럼 프로그래밍할 수 있다
내부에 작업의 완료 여부/결과값을 가진다
요청 스레드가 future의 get() 메서드를 호출하면 Future가 완료 상태가 될 때까지 대기한다. 요청 스레드의 상태는 WAITING이 된다
스레드가 어떤 결과를 얻기 위해 대기하는 것을 Blocking이라고 한다
- Thread.join()
- Future.get()
ExecutorService의 처리 절차
- 요청 스레드가 작업 스레드에게 작업을 요청하고 WAITING 상태로 바뀌어 대기한다
- 작업 스레드는 작업을 완료한 후 Future에 반환값을 담는다
- 작업 스레드는 Future의 상태값을 완료로 변경한다
- 작업 스레드는 요청 스레드를 깨운다
- 요청 스레드는 RUNNABLE 상태가 된다
메서드
- boolean cancel(boolean mayInterruptIfRunning);
- 완료되지 않은 작업 취소
- 취소 상태로 변경한다
- mayInterruptIfRunning
- true
- 작업이 실행 중이면 Thread.interrupt() 메서드를 호출하여 작업 중단
- false
- 작업이 실행 중이면 중단하지 않음
- true
- 반환값
- true
- 작업이 취소됨
- false
- 이미 완료되었거나 취소할 수 없는 경우
- true
- boolean isCancelled();
- boolean isDone();
- 작업 성공 여부와 무관하게 작업이 끝났는지 확인
- V get() throws InterruptedException, ExecutionException;
- 작업이 완료될 때까지 대기하고, 완료되면 결과 반환
- 취소 상태라면 CancellationException 런타임 예외 발생
- 예외
- ExecutionException
- 작업 중에 예외가 발생한 경우
- ExecutionException
- V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
- 위와 동일
- 주어진 시간 내에 작업이 완료되지 않으면 TimeoutException 발생
- Future.State state()
- 자바 19부터 지원
'Java > Concurrency' 카테고리의 다른 글
| Java > Concurrency > 9. Concurrent Collections (0) | 2024.10.14 |
|---|---|
| Java > Concurrency > Atomic operation, CAS (0) | 2024.10.04 |
| Java > Concurrency > 7. Synchronization (0) | 2024.09.23 |
| Java > Concurrency > 6. Memory visibility (0) | 2024.09.13 |
| Java > Concurrency > 3. Thread info (0) | 2024.09.05 |