Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Item 80&81. concurrent 패키지 제대로 이해하기 #36

Open
ChoiSeEun opened this issue Jan 5, 2024 · 0 comments
Open

Item 80&81. concurrent 패키지 제대로 이해하기 #36

ChoiSeEun opened this issue Jan 5, 2024 · 0 comments
Assignees

Comments

@ChoiSeEun
Copy link
Contributor

section: 11장

  • 아이템 80. 스레드보다는 실행자, 태스크, 스트림을 애용하라
  • 아이템 81. wait와 notify보다는 동시성 유틸리티를 애용하라

🍵 서론

item80item81 에서 한 가지의 핵심만 뽑으라고 한다면, java.util.concurrent 이다. 교재에서는 이 패키지를 아래 세 범주로 나눠서 말하고 있다.

  • 실행자 프레임워크 (Executor Framework)
  • 동시성 컬렉션 (Concurrent Collection)
  • 동기화 장치 (Synchronizer)

다행히도(?) 지난 스터디에서 정리한 내용이 많아서, 어렵지 않게 이해했을 것이라 생각한다. 복습 겸 각 범주별 핵심 내용에 대해서 살펴보자!

🌒 본론

concurrent 패키지

java.util.concurrent 는 Java 5에서 추가된 패키지이며, 멀티 스레드 환경에서 유용한 클래스들을 담고 있다. 패키지에서 제공하는 주요 기능은 아래와 같다.

  • Locks : 상호 배제를 사용할 수 있는 클래스를 제공
  • Atomic : 동기화가 되어있는 변수를 제공
  • Executors : 스레드 풀 생성, 스레드 생명주기 관리, Task 등록과 실행 등을 간편하게 처리
  • Queue : thread-safe한 FIFO 큐 제공
  • Synchronizers : 특수한 목적의 동기화를 처리하는 5개의 클래스 제공
    • Semaphore
    • CountDownLatch
    • CyclicBarrier
    • Phaser
    • Exchanger

1️⃣ 실행자 프레임워크

어찌됐든, 멀티 쓰레드 환경을 좀 더 편하게 만들어준다는 건 알 것 같다.

ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(runnable);
exec.shutdown();

실제 코드를 봐도, 위 3줄만으로 (1) 작업 큐를 생성 (2) 태스크 실행 (3) 종료 할 수 있고 코드도 직관적이다.

실행자 서비스는 단순히 태스크를 실행하거나 종료하는 것 이외에도,

  • 특정 태스크가 완료되기를 기다리기
  • 태스크 중 하나 혹은 모든 테스크가 완료되기를 기다리기
  • 실행자 서비스가 종료하기를 기다리기
  • 완료된 태스크들의 결과를 차례로 받기
  • 태스크를 특정 시간에 혹은 주기적으로 실행하게 하기
    와 같은 기능들을 수행할 수 있다.
public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

참고로 ExecutorService 가 상속받고 있는 Executor 는 등록된 작업을 실행하는 책임만 가지는 인터페이스이다. 일반적으로, ExecutorServiceExecutor 를 상속 받아서 실행의 책임을 가진다고 보면 된다.

public interface Executor {
   void execute(Runnable command);
}

그리고 ExecutorService 를 생성하는 Executors 는 Thread를 쉽게 다룰 수 있도록 도와주는 팩토리 클래스이다. 위의 예시와 같은 단일 스레드 뿐아니라, 스레드 풀 또한 손쉽게 생성할 수 있다.

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}

Thread Pool

여러 개의 스레드를 사용해야 한다면 스레드 풀 을 사용하는 것이 좋다. 과도한 스레드의 사용은 성능 저하의 원인이 되기 때문에, 일정 수의 스레드를 재사용하면서 성능과 리소스를 효율적으로 관리할 수 있다.

➀ CachedThreadPool
스레드를 캐싱하는 스레드 풀이며, 작은 프로그램이나 가벼운 서버에 유리하다. 여기서 말하는 캐싱은 스레드를 검사한다는 뜻으로, 60초 동안 작업이 없으면 풀에서 제거한다. 또한, 요청 받은 태스크들이 큐에 쌓이지 않고 즉시 스레드에 위임되어 실행되므로 무거운 서버에는 적합하지 않다.

➁ FixedThreadPool
고정된 개수를 가지는 스레드 풀로, 무거운 프로덕션 서버에 적합하다. 주로 멀티 스레드 환경의 테스트도 해당 스레드 풀을 사용한다.

Runnable & Callable

두 가지 인터페이스는 자바에서 스레드를 사용하기 위해 구현해야 하는 인터페이스이다. 두 가지 모두를 구현해야하는 것은 아니고, 둘 중에 한 가지만 구현하면 스레드를 사용할 수 있다.

일반적으로 많이 사용하는 것은 Runnable 이며, 익명 객체 및 람다를 사용할 수 있다는 특징이 있지만 값을 반환할 수 없다는 단점 때문에 Callable 를 사용하기도 한다.

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

fork-join 태스크

작은 하위 태스크로 나뉠 수 있고, ForkJoinPool을 구성하는 스레드들이 이 태스크들을 처리하며, 일을 먼저 끝낸 스레드는 다른 스레드의 남은 태스크를 가져와 대신 처리할 수 있다.

fork-join 은 병렬 처리를 위한 공통된 모델로, 분할 정복 알고리즘을 통해 재귀적으로 처리된다. 자바는 자바7부터 이를 지원하며, 해당 모델에 적합한 작업이라면 높은 처리량과 낮은 지연시간을 자랑한다. 이해를 돕기 위해, 100개의 랜덤한 숫자를 합산하는 프로그램을 fork-join 으로 처리하는 코드를 보자.

import java.util.*;

public class SumNumberTask extends RecursiveTask<Integer> {
    private final List<Integer> numbers;

    public SumNumberTask(List<Integer> numbers) {
        this.numbers = numbers;
    }

    @Override
    protected Integer compute() {
		// 25개 이하이면 실제 작업 진행 
        if (numbers.size() <= 25) {
            return numbers.stream().mapToInt(Integer::intValue).sum();
        }
	    // 25개 이상이면 subtask 생성 후 task재귀 호출 
        List<SumNumberTask> subTasks = partitioningTask(this.numbers);

        return ForkJoinTask.invokeAll(subTasks)
                .stream()
                .mapToInt(ForkJoinTask::join)
                .sum();
    }

	// 입력받은 리스트를 절반으로 나누고 2개의 task를 생성해서 반환
    private List<SumNumberTask> partitioningTask(List<Integer> numbers) {
        List<Integer> sharedFrist = numbers.subList(0, numbers.size() / 2);
        List<Integer> sharedSecond = numbers.subList(numbers.size() / 2, numbers.size());
        SumNumberTask firstTask = new SumNumberTask(sharedFrist);
        SumNumberTask secondTask = new SumNumberTask(sharedSecond);
        List<SumNumberTask> subtasks = new ArrayList<>();
        subtasks.add(firstTask);
        subtasks.add(secondTask);
        return subtasks;
    }
    private static List<Integer> makeRandomNumbers() {
        return new Random().ints(1000, 1, 100)
                .boxed()
                .collect(Collectors.toList());
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> numbers = makeRandomNumbers();
        int sum = numbers.stream().mapToInt(Integer::intValue).sum();
        System.out.println(sum + "<== single thread");

        SumNumberTask sumNumberTask = new SumNumberTask(numbers);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(sumNumberTask);
        Integer integer = submit.get();
        System.out.println(integer + "<== multi thread");
    }
}

2️⃣ 동시성 컬렉션

이는, 우리가 알고 있는 컬렉션에 동시성을 가미한 컬렉션을 말한다. 동시성을 위해 내부에서 동기화를 수행하기 때문에 동시성을 무력화할 수 없을 뿐 더러, 외부에서 락을 추가로 사용하면 오히려 속도가 느려지게 된다.

이런 특성 때문에 상태 의존적 수정 메서드들이 추가되었다. 가장 대표적인 메서드가 putIfAbsent(key,value) 이다. 주어진 키에 매핑된 값이 있을 때와 없을 때의 반환값이 다르다. 기존 값이 있었다면 그 값을 반환하고, 없었다면 null을 반환한다. 또한, 주어진 키에 매핑되는 값이 없을 때만 새로운 value값을 집어넣는다.

public static String intern(String s){
	String result = map.get(s);
	if(result==null){
		result = map.putIfAbsent(s,s);
		if(result==null)
			result = s;
	}
	return result;
}

동시성 컬렉션은 동기화 컬렉션 보다 성능이 좋다. 그 예로, Collections.synchronizedMap 보다 ConcurrentHashMap 을 사용하는 것이 훨씬 성능이 좋다.

일부의 컬렉션들은 작업이 성공적으로 완료될 때까지 기다리도록 확장되었다. 가장 대표적인 예시가 BlockingQueue 이다.
이프로가 들려주는 BlockingQueue 이야기

하나 이상의 생산자 스레드가 작업을 큐에 추가하고, 하나 이상의 소비자 스레드가 큐에 있는 작업을 처리하는 형태인 작업 큐로 쓰기에 적합하다.

3️⃣ 동기화 장치

동기화 장치는, 스레드가 다른 스레드를 기다릴 수 있게 하여 서로 작업을 조율할 수 있도록 도와준다. 위에서 언급한 5개의 SynchronizersCountDownLatchSemaphore 가 가장 자주 사용되며, 가장 강력한 동기화 장치는 Phaser 이다. CyclicBarrierExchanger 는 비교적 덜 사용된다.

각 동기화 장치는 각각의 특징을 가지고 있고, 여기서는 특징에 대해서만 간단하게만 정리해보려고 한다. 예시코드가 아래 링크를 참고하면 좋겠다 !
Java concurrent 패키지의 동기화 장치

CountDownLatch

  • 생성할 때 1 이상의 count 값을 인자값으로 받음
  • await() 를 호출한 스레드는 대기 상태로 들어감
  • countDown() 메소드가 처음 설정한 count 만큼 호출되여 count 가 0이 되면 await() 호출한 스레드의 대기 상태가 해제
  • 0이 된 latch는 재사용 불가

Semaphore

  • 한 공유 자원 또는 연산을 점유하는 스레드의 개수를 제한할 때 사용
  • 생성 시 2개의 인자를 지정할 수 있음
    • permit : 점유할 수 있는 스레드의 최대 갯수
    • fair : permit 획득 순서 지정 (true이면 FIFO)
  • acquire()release() 로 lock을 획득하고 해제

Phaser

  • CyclicBarrier와 비슷한 동작
    • 모든 스레드가 대기 상태에 들어가면 대기 상태를 해제
  • 동기화에 참여할 스레드의 수가 동적이고 재사용이 가능
  • 용어
    • register : 동기화 과정에 참여할 스레드의 갯수를 추가
      • parties : 처음 Phaser를 생성할 때 동기화에 참여하는 스레드의 수
      • 처음 생성할 때 1개를 생성했더라도, register()를 이용해서 동적으로 스레드의 갯수를 조정 가능
    • arrive : 각 스레드가 대기 상태에 들어가는 지점에 도달
    • phase : 모든 스레드가 arrive 가 되고 대기 상태가 풀리는 과정
      • phase가 끝나면 Phaser 내부의 phase 값이 하나씩 증가

CyclicBarrier

  • CountDownLatch와 비슷
    • 생성할 때 1 이상의 count 값을 인자값으로 받음
    • await() 를 호출한 스레드는 대기 상태로 들어감
  • 다른 스레드가 전부 대기 상태가 되었을 때 모든 스레드의 대기 상태를 해제
    • 즉, 처음 설정한 count 만큼 await() 가 호출되면 대기 상태가 해제됨
  • CyclicBarrier 는 재사용이 가능

Exchanger

  • 두 스레드 사이에서 객체를 교환하는데 사용
  • 한 쪽의 스레드에서 교환 메소드를 호출 했을 때, 다른 쪽의 스레드가 교환 메소드를 호출한 상태가 아니라면 그 스레드가 메소드를 호출할 때까지 대기
  • generic 에 교환할 객체를 지정하고 생성

🍃 결론

늘 느끼는 거지만, 멀티 스레드는 참 어렵고 복잡하다 ! 하지만 이제는 모두들 익숙하게 사용할 수 있기를 바라며 ...


reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant