개발 공부 기록하기/01. JAVA & Kotlin

자바 동시성 자료 조사

lannstark 2020. 10. 18. 02:08

문득 자바 Thread에 대하여 깊이 있게 알고 싶다는 생각이 들었다.

Thread와 관련하여, 가지고 있는 Java 관련 책들이나 JDK docs를 읽으며 간단히 정리해보았다.

 

 

혼자 공부하는 자바

스레드가 사용중인 다른 객체를 다른 스레드가 변경할 수 없게 하려면 스레드 작업이 끝날 때까지 객체에 잠금을 걸어서 다른 스레드가 사용할 수 없도록 해야 한다.

  • 임계 영역 : 멀티 스레드 프로그램에서 단 하나의 스레드만 실행할 수 있는 코드 영역
  • 자바는 임계 영역을 지정하기 위해 동기화 메소드를 제공한다.
public synchronized void method() {
  // 임계 영역 : 단 하나의 스레드만 실행
}

동기화 메소드는 메소드 전체 내용이 임계 영역이므로 스레드가 동기화 메소드를 실행하는 즉시 객체에는 잠금이 일어나고, 스레드가 동기화 메소드를 실행 종료하면 잠금이 풀린다.

 

여러개의 동기화 메소드가 존재할 경우, 스레드 A가 이들 중 하나를 실행할 때 스레드 B는 다른 동기화 메소드들도 실행할 수 없다. 하지만 이때 다른 스레드에서 일반 메소드는 실행이 가능하다.

Thread Life Cycle

REF : https://www.baeldung.com/java-thread-lifecycle

  • Thread 객체가 생성하고 start() 메소드가 호출되었을때 바로 실행되는 것이 아니라 실행 대기 상태가 된다. 언제든지 실행할 준비가 되어 있는 상태를 뜻한다. OS는 실행 대기 상태에 있는 스레드 중에서 하나를 선택해서 실행 상태로 만든다.
  • run() 메소드의 내용이 모두 실행되면 스레드의 실행이 멈추고 종료 상태가 된다.
  • interrupt() : 일시 정지 상태의 스레드에서 InterruptedException을 발생시켜, 예외 처리 코드(catch)에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 한다.
  • sleep(long millis) : 주어진 시간 도안 스레드를 일시 정지 상태로 만든다. 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다.
  • stop() : 스레드를 즉시 종료한다. 불안전한 종료를 유발하므로 사용하지 않는 것이 좋다.

 

Non-Runnable 3가지 상태

  • Time Waiting : 주어진 시간 동안 action을 하지 않는 TIMED_WAITING 상태
  • Waiting : 다른 스레드의 특정한 action을 기다리는 WAITING 상태
  • Blocked : 다른 스레드에 의해 lock되어 있는 구역을 기다리는 BLOCKED 상태

스레드의 안전한 종료

Thread를 즉시 종료하기 위한 stop() 메소드는 deprecated되었다. stop() 메소드로 스레드를 갑자기 종료하게 되면 스레드가 사용 중이던 자원들이 불안전한 상태로 남겨지기 때문이다.

REF : https://www.baeldung.com/java-thread-stop

Using a Flag

public class ControlSubThread implements Runnable {
  private Thread worker;
  private final AotmicBoolean running = new AtomicBoolean(false);
  private int interval;

  public ControlSubThread(int sleepInterval) {
    interval = sleepInterval;
  }

  public void start() {
    worker = new Thread(this);
    worker.start();
  }

  public void stop() {
    running.set(false);
  }

  public void run() {
    running.set(true);
    while (running.get()) {
      try {
        Thread.sleep(interval);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } 
      // do something here
    }
  }
}

Interrupting a Thread

스레드에는 sleep()이 긴 시간 호출되거나 BLOCKING 상태가 지속될 수 있는 risk가 존재한다.

이런 상황을 위해 interrupt 작업을 할 수 있다.

public class ControlSubThread implements Runnable {
  private Thread worker;
  private final AotmicBoolean running = new AtomicBoolean(false);
  private int interval;

  public void interrupt() {
    running.set(false);
    worker.interrupt();
  }

  boolean isRunning() {
    return running.get();
  }

  boolean isStopped() {
    return stopped.get();
  }

  public void run() {
    running.set(true);
    stopped.set(false);
    while (running.get()) {
      try {
        Thread.sleep(interval);
      } catch (InterruptedException e){
        Thread.currentThread().interrupt();
        System.out.println(
          "Thread was interrupted, Failed to complete operation");
      }
      // do something
    }
    stopped.set(true);
  }
}

데몬 스레드 : 주 스레드의 작업을 돕는 보조적인 역할을 수행하는 스레드이다. 주 스레드가 종료되면 데몬 스레드는 강제적으로 종료되는데, 그 이유는 주 스레드의 보조 역할을 수행하므로 주 스레드가 종료되면 데몬 스레드의 존재 의미가 사라지기 때문이다.

이것이 자바다

Thread의 상태가 변경되는 method

  • interrupt() : 일시 정지 상태의 스레드에게 InterruptedException 예외를 발생시켜, 예외 처리 코드(catch)에서 실행 대기 상태로 가거나 종료 상태로 갈 수 있도록 한다.
  • notify(), notifyAll() : 동기화 블록 내에서 wait() 메소드에 의해 일시 정지 상태에 있는 스레드를 실행 대기 상태로 만든다.
  • sleep() : 주어진 시간 동안 스레드를 일시 정지 상태로 만든다. 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다.
  • join() : join() 메소드를 호출한 스레드는 일시 정지 상태가 된다. 실행 대기 상태로 가려면, join() 메소드를 멤버로 가지는 스레드가 종료되거나, 매개값으로 주어진 시간이 지나야 한다. (다른 스레드의 종료를 기다리는 상태)
  • wait() : 동기화 블록 내에서 스레드를 일시 정지 상태로 만든다. 매개값으로 주어진 시간이 지나면 자동적으로 실행 대기 상태가 된다. 시간이 주어지지 않으면 notify(), notifyAll() 메소드에 의해 실행 대기 상태로 갈 수 있다.
  • yield() : 실행 중에 우선순위가 동일한 스레드에게 실행을 양보하고 실행 대기 상태가 된다.

ThreadGroup

ThreadGroup은 관련된 스레드를 묶어서 관리할 목적으로 이용된다. JVM이 실행되면 system 스레드 그룹을 만들고, JVM 운영에 필요한 스레드들을 생성해서 system 스레드 그룹에 포함시킨다. 스레드는 반드시 하나의 스레드 그룹에 포함되는데, 명시적으로 스레드 그룹에 포함시키지 않으면 기본적으로 자신을 생성한 스레드와 같은 스레드 그룹에 속하게 된다.

// 현재 스레드가 속한 스레드 그룹 알아내기
ThreadGroup tg = Thread.currentThread().getThreadGroup();

// 프로세스 내에서 실행하는 모든 스레드에 대한 정보 알아내기
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();

실제 아무것도 띄우지 않은 pure java main method에서 돌고 있는 스레드들을 살펴보자

system 그룹에 3가지 Thread가 존재하고 main 그룹에 1가지 Thread가 존재한다.

  • Signal Dispatcher : 이름 그대로 java process에 전달되는 signal 을 캐치해주는 thread
  • main : 메인 스레드
  • Finalizer : finalizer method를 호출하는 Thread (GC 관련)
  • Reference Handler : 대기하는 reference들을 enqueue하는 thread (GC관련)

ThreadPool

병렬 작업 처리가 많아지면 스레드 개수가 증가되고 그에 따른 스레드 생성과 스케쥴링으로 인해 CPU가 바빠져 메모리 사용량이 늘어난다. 따라서 애플리케이션의 성능이 저하된다. 갑작스런 병렬 작업의 폭증으로 인한 스레드의 폭증을 막으려면 ThreadPool을 사용해야 한다. 스레드풀은 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 Queue에 들어오는 작업들을 하나씩 스레드가 맡아 처리한다.

 

자바는 스레드풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 ExecutorService 인터페이스와 Executors 클래스를 제공하고 있다. Executors의 다양한 정적 메소드를 이용해서 ExecutorService 구현 객체를 만들 수 있는데, 이것이 바로 스레드풀이다. 다음 그림은 ExecutorService가 동작하는 방식을 보여준다.

스레드풀 생성 및 종료

ExecutorService 구현 객체는 Executors 클래스의 다음 두 가지 메소드 중 하나를 이용해서 간편하게 생성할 수 있다.

  • newCachedThreadPool : 초기 스레드 수 0, 코어 스레드 수 0, 최대 스레드 수 Interval.MAX_VALUE
  • newFixedThreadPool(int nThreads) : 초기 스레드 수 0, 코어 스레드 수 nThreads, 최대 스레드 수 nThreads

두 메소드를 사용하지 않고 코어 스레드 개수와 최대 스레드 개수를 설정하고 싶다면, 직접 ThreadPoolExecutor 객체를 생성하면 된다. 사실 위 두 가지 메소드도 내부적으로 ThreadPoolExecutor 객체를 생성해서 리턴한다.

 

main() 메소드가 실행이 끝나도 애플리케이션 프로세스는 종료되지 않는다. 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해주어야 한다. ExecutorService는 종료와 관련해서 다음 세 개의 메소드를 제공하고 있다.

  • shutdown() : 현재 처리 중인 작업 뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리한 뒤에 스레드풀을 종료시킨다.
  • shutdownNow() : 현재 작업 처리 중인 스레드를 interrupt 해서 작업 중지를 시도하고 스레드 풀을 종료시킨다. 리턴 값은 작업 큐에 있는 미처리된 작업(Runnable)의 목록이다.
  • awaitTermination(timeout, unit) : shutdown 메소드 호출 이후 모든 작업 처리를 timeout 시간 내 완료하면 true를 리턴하고, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false를 리턴한다.

작업 생성과 처리 요청

하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현한다. 둘의 차이는 작업 처리 완료 후 리턴값이 있느냐 없느냐이다. Runnable이 없고 Callable이 있다.

  • void execute(Runnable command) : Runnable을 작업 큐에 저장. 작업 처리 결과를 받지 못함
  • 여러 Future<V> submit() : Runnable 또는 Callable을 작업 큐에 저장. 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음

execute()는 작업 처리 도중 예외가 발생하면 스레드가 종료되고 해당 스레드는 스레드풀에서 제거된다. 따라서 스레드풀은 다른 작업 처리를 위해 새로운 스레드를 생성한다. 반면에 submit() 은 작업 처리 도중 예외가 발생하더라도 스레드는 종료되지 않고 다음 작업을 위해 재사용된다. 그렇기 때문에 가급적이면 스레드의 생성 오버헤더를 줄이기 위해 submit()을 사용하는 것이 좋다.

Future

Future 객체는 작업 결과가 아니라 작업이 완료될 때까지 기다렸다가 (지연했다가 = blocking되었다가) 최종 결과를 얻는데 사용된다. 그래서 Future를 지연 완료 (pending completion) 객체라고 한다. Future의 get 메소드를 호출하면 스레드가 작업을 완료할 때까지 블로킹 되었다가 작업을 완료하면 처리 결과를 리턴한다.

 

Future를 이용한 blocking 방식의 작업 완료 통보에서 주의할 점은 작업을 처리하는 스레드가 작업을 완료하기 전에는 get() 메소드가 블로킹 되므로 다른 코드를 실행할 수 없다는 것이다. 그렇기 때문에 get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드풀의 또 다른 스레드가 되어야 한다.

Future 객체는 작업 결과를 얻기 위한 get() 메소드 이외에도 다음과 같은 메소드를 제공한다.

  • cancel(boolean mayInterruptIfRunning) : 작업 처리가 진행 중일 경우 취소시킴
  • isCancelled() : 작업이 취소되었는지 여부
  • isDone() : 작업 처리가 완료되었는지 여부

CompletionService

작업 요청 순서대로 작업 처리가 완료되는 것은 아니다. 작업의 양과 스레드 스케쥴링에 따라서 먼저 요청한 작업이 나중에 완료되는 경우도 발생한다. 여러 개의 작업들이 순차적으로 처리될 필요성이 없고, 처리 결과도 순차적으로 이용할 필요가 없다면 작업 처리가 완료된 것부터 결과를 얻어 이용하면 된다. 스레드풀에서 작업 처리가 완료된 것만 통보받는 방법도 있는데, CompletionService를 이용하는 것이다. CompletionService는 처리 완료된 작업을 가져오는 poll()take() 메소드를 제공한다.

ExecutorService executorService = Executors.newFixedThreadPool(
  Runtime.getRuntime().availableProcessors()
};
CompletionService<V> completionService = new ExecutorCompletionService<V>(executorService);

// completionService에 task 제출
completionService.submit(task1);
completionService.submit(task2);

// ThreadPoolExecutor를 통해 completion Future 처리 task 제출
executorService.submit(new Runnable() {
  @Override
  public void run() {
    while(true) {
      try {
        Future<V> future = completionService.take();
        V value = future.get();
        // 추후 처리
    } catch (Exception e) {
        break;
    }
  }
});

// ExecutorService 내에 있는 Thread에 Exception
executorService.shutDownNow(); // 위의 While 문 break 처리

Practical 모던 자바

  • 병행 (Concurrency) : 하나의 CPU 코어에서 SW적인 기법으로 동시에 여러 작업을 교차하며 실행하는 것
  • 병렬 (Parallel) : 여러 개의 코어에 작업을 배분해서 동시에 작업을 실행하는 것
  • 분산 (Distribution) : 여러 대의 원격 서버 혹은 물리적인 장비에 작업을 분산시켜서 처리하는 개념

엄밀히 말하면, 멀티 코어 환경에서는 병행과 병렬 작업이 동시에 일어난다. 작업이 여러 코어로 배분될 뿐만 아니라 하나의 코어에서 여러 작업이 병행해서 동작한다.

 

JDK 5에서 발표된 concurrent API는 고수준 API로 개발자들이 보다 쉽게 병렬 프로그래밍을 할 수 있도록 사전에 많은 부분을 미리 정의해 놓아서 명확하고 편리하다.

  • java.util.concurrent : concurrent 프로그래밍에서 가장 많이 사용하는 utility 클래스가 포함된 패키지, 비동기 태스크 실행과 스레드의 생성/관리를 위한 다양한 인터페이스와 클래스가 제공된다.
  • java.util.concurrent.atomic : 병렬 프로그래밍에서 데이터에 대한 정합성을 확보하기 위하여 원자적 변수 선언 기능을 제공하는 패키지
  • java.util.concurrent.locks : concurrent API에서 객체에 대한 잠금, 대기 기능을 제공하는 유틸리티 클래스와 인터페이스를 제공

concurrent 패키지

java.util.concurrent 내의 많은 인터페이스와 클래스는 6개로 구분할 수 있다.

  • 실행자 (Executors) : concurrent API에서 작업을 실행하는 역할. 새로운 스레드를 생성하기도 하고 기존에 생성된 스레드를 재활용하기도 하며, 작업을 순차적으로 혹은 병렬적으로 실행시키기도 한다. 추가적으로 스케쥴링이 가능해서 주기적으로 작업을 실행시킬 수도 있고 현재가 아닌 미래의 특정 시점에 작업을 실행시킬 수도 있다.
  • 큐 (Queue) : 멀티 스레드 환경에서 안정성을 보장하는 큐
  • 타이밍 (Timing) : 타임아웃 기능을 통해 불필요한 스레드, 좀비나 데드락 스레드를 관리하고 SW를 안정적으로 동작하도록 한다. 이 기능을 구현하기 위해 시간에 대한 정확도를 높였으며 concurrent API에서는 나노초 단위까지 정밀한 제어가 가능하다.
  • 동기화 (synchronizers) : 동기화와 관련한 5가지 유형의 유틸리티를 제공하고 있다.
  • 컨커런트 컬렉션(Concurrent Collection) : Concurrent API 환경에서 List 혹은 Map형 데이터를 다루기 위해 제공하는 인터페이스와 클래스들
  • 메모리 정합성 관련 속성

Executors

Executors에서 제공하는 여러 ExecutorService 구현체들 정리

newSingleThreadExecutor

  • 오직 하나의 스레드로 처리하며 나머지 스레드 생성 요청은 현재 스레드가 종료될 때까지 대기한다.
  • 현재 메인 클래스에서 오직 하나의 스레드로 작업을 수행할 때 안전하게 사용할 수 있는 장점은 있지만, 여러 개의 스레드를 생성할 수 없다는 문제가 있다.

newFixedThreadPool

  • 입력 파라미터로 생성할 스레드 풀의 크기를 정의한다. 스레드 풀의 크기 내에서 스레드가 생성되어 병렬 처리된다.
  • 스레드 풀의 크기를 넘으면 여유가 생길 때까지 대기한다.

newCachedThreadPool

  • 멀티 스레드 처리를 위한 스레드 풀을 생성하되 기존에 생성한 스레드를 가능한 한 재사용한다.
  • 멀티 스레드 기반으로 동작한다는 점에서 newFixedThreadPool과 동일하지만, 등록된 스레드를 모두 한 번에 실행하며 동시 처리에 대한 개수 제한이 없다.

newWorkStealingPool

  • 스레드 풀을 생성하며, 실행되는 HW의 사용 가능한 모든 CPU의 코어를 쓰도록 병렬 처리 레벨을 설정한다.
  • 해당 HW의 자원을 모두 선점하려고 하기 때문에 다른 프로세스 혹은 애플리케이션의 성능에 영향이 크다.

unconfigurableExecutorService

  • 메소드의 입력 파라미터로 반드시 ExecutorService 객체를 전달해야 한다. 그리고 해당 객체를 표준 ExecutorService 객체로 위임해서 결과를 리턴한다.
  • 이 메소드는 ExecutorService를 구현한 여러 클래스의 기능 중 ExecutorService의 메소드만 호출하고 나머지 기능을 사용하지 못하도록 제한할 필요가 있을 때 사용한다.

ScheduledExecutorService 구현체들 정리. ScheduledExecutorService는 주기적으로 task를 실행시켜야 하거나 특정 시간에 task가 실행되도록 예약하는 기능을 구현할 때 주로 사용한다.

newScheduledThreadPool

  • 스레드가 특정 시간 이후, 혹은 일정 시간 간격으로 실행되도록 하는 스레드 풀을 생성한다.
  • 스레드 풀의 크기를 지정한다.

newSingleThreadScheduledExecutor

  • 스레드가 특정 시간 이후, 혹은 일정 시간 간격으로 실행되도록 하는 스레드 풀을 생성한다.
  • 하나의 스레드만 실행되며 나머지 스레드는 실행 시간이 지정되더라도 현재 실행 중인 스레드가 종료될 때까지 대기한다.

unconfigurableScheduledExecutorService

  • 메소드의 입력 파라미터로 반드시 ScheduledExecutorService 객체를 전달해야 한다. 그리고 해당 객체를 표준 ScheduledExecutorService 객체로 위임해서 결과를 리턴한다.
  • ScheduledExecutorService를 구현한 여러 클래스의 기능 중 ExecutorService의 메소드만을 호출하고 나머지 기능을 제한할 필요가 있을 때 사용한다.

웹 프로그래밍을 하면서 스케쥴링 작업 때문에 고민해본 개발자라면 concurrent API의 ScheduledExecutorService 기능을 이용해서 간편하게 문제를 해결할 수 있고, 별도로 쿼츠와 같은 라이브러리를 이용하지 않더라도 구현이 가능할 것이다.

포크/조인 프레임워크

자바 5에서 처음 제공한 concurrent API를 최종적으로 완성한 것은 자바 7에서 제공한 포크/조인 프레임워크이다. 자바 6까지의 concurrent API는 스레드를 생성하는 것에 초점을 맞췄다면 자바 7에서는 스레드와의 연관성을 관리할 수 있는 기능과 함께 효율적으로 HW 자원을 활용할 수 있는 방법을 제공한다.

 

포크/조인 프레임워크는 엄밀히 말하면 아주 새로운 기능이 아니라 ExecutorService 인터페이스를 구현한 클래스이다. 이 프레임워크의 주된 목적은 멀티 프로세서 혹은 멀티 코어를 가지고 있는 HW 자원을 최대한 효율적으로 활용해서 병렬 처리가 가능하도록 하는 것이다.

  • 포크 (Fork) : 다른 프로세스 혹은 스레드(task)를 여러 개로 쪼개서 새롭게 생성한다는 의미
  • 조인 (Join) : 포크해서 실행한 프로세스 혹은 스레드(태스크)의 결과를 취합한다는 의미

개발자가 분할 가능 여부에 대한 코드를 작성해야 하며, 정의해 놓지 않았다면 하위 작업으로 분할되지 않고 하나의 큰 작업으로 실행된다. 이러한 경우 concurrent API의 장점과 포크/조인 프레임워크의 장점을 사용할 수 없다. 작업을 분할할 수 없을 경우에는 순차 처리를, 분할이 가능하면 별도의 태스크로 나눈 후 병렬 처리하는 방식을 택해야 한다.

Future / CompletableFuture

Future 인터페이스에 대한 자바 API 설명을 살펴보면 "비동기 연산의 결과를 표현한다"고 정의하고 있다. Future에서는 isDone과 isCancelled 메소드를 사용해서 비동기 연산이 종료 혹은 취소됐는지 확인할 수 있으며 get 메소드를 호출하면 결과값을 응답 받을 때까지 대기한다. List<Future> 등을 사용하면 실행시킨 모든 비동기 연산이 끝날 때까지 대기시킬 수도 있다. Java8에서 CompletableFuture가 추가되었다. Future 인터페이스로 연산의 완료 여부를 판단하고 결괏값을 얻어노는 것만으로는 충분하지 않기 때문이다. CompletableFuture에서는 비동기 연산 간의 관계를 정의하거나 연산 결과를 수집, 조합하는 등의 작업이 추가되었다. CompletableFuture의 장점은 다음과 같다.

  • 스레드의 선언 없이도 비동기 연산 작업을 구현할 수 있고 병렬 프로그래밍이 가능하다.
  • 람다 표현식과 함수형 프로그래밍을 사용할 수 있어서 코드의 양을 현저히 줄일 수 있다.
  • 파이프라인 형태로 작업들을 연결할 수 있어서, 비동기 작업의 순서를 정의하고 관리할 수 있다.

CompletableFuture의 기본이 되는 메소드들은 다음과 같다.

  • runAsync : Runnable 구현체를 이용해서 비동기 연산 작업을 하기 위한 새로은 CompletableFuture 객체를 리턴한다.
  • supplyAsync : Supplier 함수형 인터페이스의 구현체를 이용해서 비동기 연산 작업을 위한 새로운 CompletableFuture 객체를 리턴한다.
  • thenAccept : 현재 단계가 성공적으로 종료되었을 경우, 메소드의 파라미터로 전달된 Consumer 함수형 인터페이스의 구현체를 실행하기 위한 CompletionStage 객체를 리턴한다.
  • thenRun : 현재 단계가 성공적으로 종료되었을 경우, 메소드의 파라미터로 전달된 Runnable 구현체를 실행하기 위한 CompletionState 객체를 리턴한다.
  • complete : 현재 태스크를 종료하며 만일 태스크가 동작중이라면, get 메소드와 동일하게 종료될때까지 대기하고, 최종 태스크 결과를 리턴한다

Concurrent Collection, 원자적 변수

초기 자바에서 제공한 Vector와 Hashtable은 멀티 스레드 환경에서 데이터의 정합성을 너무나 강조한 나머지 synchronized 키워드를 빈번하게 사용했고, 그로 인해 성능에서 문제가 발생하였다. 그래서 곧바로 JDK 1.2에서 List와 HashMap을 통해 동기화 키워드를 제거한, 그리고 좀 더 속도가 빠른 컬렉션들을 제공했다. (List, HashMap 등등) Concurrent API에서는 concurrent collection 이라는 병렬 프로그래밍에 특화된 컬렉션을 제공한다.

  • BlcokingQueue : 선입 선출 데이터 구조를 가지는 큐이다. 데이터를 관리하는 용도로 사용하지만 주로 많이 응용되는 분야는 클라이언트로부터 요청 받은 데이터를 순차적으로 처리하고 관리하기 위한 대기 큐 용도이다.
  • ConcurrentMap : java.util.Map의 하위 인터페이스로 conccurrent API에 대응하기 위한 기능을 추가했다. 이 인터페이스를 구현한 대표 클래스는 ConcurrentHashMap이며 컬렉션 프레임워크의 HashMap에 대응된다.
  • ConcurrentNavigableMap : TreeMap에 대응

원자적 변수들은 concurrent.atomic 패키지에 정의되어 있으며 '하나의 변수 선언으로 멀티 스레드 프로그래밍에서 안정성을 보장하는 클래스들을 모아 놓은 작은 toolkit'으로 설명되어 있다. 원래 변수 하나에 대해 멀티스레드 환경에서의 안정성을 보장하려면 변수에 접근하는 메소드에서 객체 lock (synchronized)를 걸어야 하지만, AtomicXXX를 사용하면 접근 메소드에서 lock을 걸지 않아도 된다.

JDK (11 기준)

Object에 존재하는 Thread 관련 메소드

notify

객체 모니터에서 대기하고 있는 하나의 thread를 깨운다. 만약 여러 thread가 대기하고 있다면, 랜덤으로 그 중 하나를 깨우게 된다. 반대로 wait method가 호출되면 해당 thread는 객체 모니터에 대기하게 된다.
깨어난 thread는 현재의 thread가 객체에 대한 lock을 포기할때까지 활동할 수 없다. 깨어난 thread는 다른 thread들과 동기화 경쟁을 하게 된다.

notify() 메소드는 객체 모니터의 owner thread에 의해서만 호출되어야 한다. owner thread의 조건은 다음 세 가지 중 하나를 만족해야 한다.

  • 객체 인스턴스의 (non-static) synchronized method를 실행하고 있을 것
  • 객체 내에서 synchornized block을 실행하고 잇을 것
  • Class 타입의 객체인 경우, 해당 class의 synchornized static method를 실행하고 있을 것

특정 시점에 오직 한 thread만이 object monitor를 획득할 수 있다.

Executor, ExecutorService

Executor

public interface Executor {

  void execute(Runnable command);

}

제출된 Runnable task를 수행하는 객체. 이 interface는 Task 제출과 스레드 실행 메커니즘(스레드 사용, 스케쥴링, 각 task가 어떻게 실행될지 등등)을 분리하는 방법을 제공한다. 일반적으로 Thread를 직접 만들어 start() 메소드를 실행시키는 것 보다 Executor를 만들어 task 집합을 제출하는 것이 낫다.

그러나 Executor 인터페이스는 task 실행이 비동기여야 함을 강요하지 않는다. 가장 간단한 구현체로는 이것을 생각해볼 수 있다.

class DirectExecutor implements Executor {
  public void execute(Runnable r) {
    r.run();
  }
}

전형적으로는 task를 제출한 thread가 아니라 새로운 Thread에서 Runnable task를 수행하게 된다.

class ThreadPerTaskExecutor implements Executor {
  public void execute(Runnable r) {
    new Thread(r).start();
  }
}

많은 Executor 구현체들은 어떻게, 그리고 언제 task를 스케쥴링할지 한계를 부과한다. 다음은 예시이다.

class SerialExecutor implements Executor {
  final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  final Executor executor;
  Runnable active;

  SerialExecutor(Executor executor) {
    this.executor = executor;
  }

  public synchronized void execute(final Runnable r) {
    tasks.offer(new Runnable() {
      public void run() {
        try {
          r.run();
        } finally {
          scheduleNext();
        }
      }
    });

    if (active == null) {
      scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {
    if ((active = tasks.poll()) != null) {
      executor.execute(active);
    }
  }
}

java.util.concurrent 에서 제공하는 Executor 구현체들은 Executor 인터페이스를 확장하고 있는 ExecutorService의 하위 구현체들이다. ThreadPoolExecutor는 확장된 스레드 풀 구현을 제공한다. Executors 클래스는 위의 Executors를 위한 편리한 팩토리 메소드를 제공한다.

ExecutorService

Executor에서 설명되었던 것처럼 확장된 기능을 가지고 있다. 스레드풀 종료, task 제출 등의 기능을 가지고 있다.

public interface ExecutorService extends Executor {

  void shutdown();

  List<Runnable> shutdownNow();

  boolean isShutdown();

  boolean isTerminated();

  boolean awaitTermination(long timeout, TimeUit unit)
    throws InterruptedException;

  <T> Future<T> submit(Callable<T> task);

  <T> Future<T> submit(Runnable Task, T result);

  Future<?> submit(Runnable task);

  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;

  <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit) throws InterruptedException;

  <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException;

  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

}

Main Thread

본격적으로 알아보기 전에 main thread 와 그 외 thread들에 대해 짧게 정리해 보았다.

  • 자바의 모든 애플리케이션은 main thread가 main() 메소드를 실행하면서 시작한다
  • main thread는 필요에 따라 다른 thread들을 만들어 병렬로 코드를 실행할 수 있다.
  • main thread가 먼저 종료되더라도 다른 thread들이 있다면 java process는 종료되지 않는다.

여기서 표현된 '다른 thread'는 영어로 'thread of execution' 이라 불리고 있다. 앞으로는 '작업 스레드'라고 부르겠다.

Thread.java 주석에 따르면, 다음 두 가지 경우가 발생하기 전까지 main thread 혹은 non-daemon 작업 thread를 계속해서 실행한다

  • Runtime 객체의 exit 메소드가 호출되었고 security manager가 해당 명령을 승인했을때
  • daemon thread가 아닌 모든 thread가 죽었을때 (daemon에 대해서는 후술하고 있다)
    • 죽었다는 뜻은 run() 메소드 내부에서 return이 호출되었거나 (명시적 or 암시적)
    • run() 메소드 내부에서 Exception이 발생한 경우를 말한다.

Thread 만들기

Thread 만드는 법

좋다. 이제 Thread를 만들어보자. Thread를 만드는 방법 역시 Thread.java 에서 자세히 설명하고 있다. Thread를 만드는 방법에는 2가지가 있다.

  1. Thread의 하위 클래스를 만들어 run() 메소드를 override 하는 것. 하위 클래스의 인스턴스는 메모리를 할당 받고 시작될 것이다.
/* Java docs에 나와 있는 예시 */
class PrimeThread extends Thread {
  long minPrime;
  PrimeThread(long minPrime) {
    this.minPrime = minPrime;
  }

  public void run() {
    // compute primes larger than minPrime
  }
}

// PrimeThread를 만들고 시작하기
PrimeThread p = new PrimeThread(143);
p.start();
  1. 두 번째 방법은 Runnable 인터페이스를 구현하는 것이다. 구현체의 인스턴스는 메모리를 할당받고 시작될 수 있다.
/* Java docs에 나와 있는 예시 */
class PrimeRun implements Runnable {
  long minPrime;
  PrimeRun(long minPrime) {
    this.minPrime = minPrime;
  }

  public void run() {
    // compute primes larger than minPrime
  }
}

PrimeRun p = new PrimeRun(143);
(new Thread(p)).start();

위 예시에서는 Runnable 을 받는 Thread 생성자만 있는 것처럼 보이지만 실제로는 꽤나 많은 Thread 생성자들도 존재한다. 한 번 살펴보도록 하자.

private 생성자

여러 public 생성자들은 최종적으로 private 생성자를 호출하고 있다. 때문에 private 생성자 코드를 뜯어보자.

private Thread(ThreadGroup g, Runnable target,
    String name, long stackSize, AccessControlContext acc,
    boolean isheritThreadLocals)
  • ThreadGroup : 스레드 그룹은 스레드의 집합을 표현한다. 한 스레드 그룹은 내부에 스레드들을 가질 수도 다른 스레드 그룹들을 가질 수 있다. 특정 스레드는 본인이 속한 스레드 그룹에 접근할 수 있지만, 그 스레드의 부모 스레드의 스레드 그룹이나 (... 말이 어렵다 ㅋㅋ) 다른 스레드 그룹에 접근할 수 없다. main thread 역시 최상위 Thread Group에 포함되어 있다.
  • Runnable : 스레드가 수행해야할 로직을 가지고 있는 인터페이스이다.
  • name : 각 스레드에는 이름을 부여할 수 있으며, 여러 스레드는 같은 하나의 이름을 가질 수도 있다.
  • stackSize : 스레드에 할당되는 메모리의 크기이다. 0을 넣게 되면 해당 파라미터를 무시한다.
  • AccessControlContext : 시스템 자원에 대한 권한 결정을 하는데 사용된다.
  • isheritThreadLocals : 상속받은 Thread가 있는지에 대한 여부를 표시한다.

Thread가 생성되는 과정을 간단히 살펴보면 이름은 그대로 null check 이후 (NPE가 발생할 수 있음) 그대로 지정되고 Thread를 만드는 parent Thread와 SecurityManager를 가져와 ThreadGroup을 찾게 된다. 그 후 ThreadGroup을 이용해 권한 체크 등을 진행하고 필드들을 집어 넣는데...

daemon 여부와 우선순위, ClassLoader는 parent의 것을 상속받고 stackSize 정도만 외부에서 받은대로 집어 넣게 된다.