본문 바로가기

Java

[Java] BlockingQueue로 생산자-소비자 문제 해결하기

1. 개요

본 포스팅은 김영한 강사님의 인프런 강의 "자바 고급 1편" 중 생산자-소비자 문제에 대해 정리한 포스팅입니다.

2. 생산자-소비자 문제?

 생산자 스레드와 소비자 스레드가 존재한다. 두 종류의 스레드는 critical section 내부의 큐에 접근하여 데이터를 역할에 맞춰 생산 및 소비하고자 한다. 하지만 큐의 길이는 한정되어 있으므로 다음의 문제가 발생할 수 있다.

  • 생산자가 너무 빠른 생산을 하여 큐가 가득 찼을 경우 생산자 스레드는 큐에 빈 공간이 생길 때 까지 기다려야 한다.
  • 소비자가 너무 빠른 소비를 하여 큐가 비었을 경우 소비자 스레드는 큐에 데이터가 들어올 때 까지 기다려야 한다.

 이처럼 두 다른 역할을 하는 스레드가 특정 자원을 함께 생산하고, 소비하면서 발생하는 문제를 생산자-소비자 문제 또는 한정된 버퍼 문제라 한다.

3. 해결?

이 문제는 Lock과 ReentrantLock을 이용하여 다음과 같이 해결할 수 있다.

package thread.bounded;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static util.MyLogger.log;

public class BoundedQueueV5 implements BoundedQueue{

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();

    public BoundedQueueV5(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    producerCond.await();
                    log("[put] 생산자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장,  signal() 호출");
            consumerCond.signal();
        }finally {
            lock.unlock();

        }

    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 생산자 대기");
                try {
                    consumerCond.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, signal() 호출");
            producerCond.signal();
            return data;
        }finally {
            lock.unlock();
        }


    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

이의 역할에 대해 하나씩 살펴보자.

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();

데이터를 저장할 큐, 이의 크기 제한이 있고, 생산자 스레드가 대기할 대기 공간, 소비자 스레드가 대기할 대기 공간이 있다. Condition이 이 역할을 수행한다.

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    producerCond.await();
                    log("[put] 생산자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장,  signal() 호출");
            consumerCond.signal();
        }finally {
            lock.unlock();

        }

    }

 생산자 스레드는 락을 획득하고, 데이터를 생산하여 큐에 집어넣기를 시도한다. 큐가 가득 차 있다면 await를 호출하여 생산자 스레드 전용 대기 공간에 보관한다. 데이터를 저장했다면 "소비자 대기 공간"에 signal을 날려 소비자 스레드를 깨운다. 

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 생산자 대기");
                try {
                    consumerCond.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, signal() 호출");
            producerCond.signal();
            return data;
        }finally {
            lock.unlock();
        }


    }

 반대로 소비자 스레드는 큐가 비어있을 시 소비자 스레드 대기 공간에 대기하고 있다가 데이터 소비에 성공하면 생산자를 호출한다.

그림처럼 Condition이라 불리는 스레드 대기 공간이 있고, Lock에서 관리하는 락 대기공간이 있다. 스레드 대기 공간에서 깨어난 스레드는 락 획득을 시도하고, 다른 스레드가 사용중이어서 획득에 실패할 경우 WAITING 상태로 락 대기공간의 대기 큐에서 대기한다.

4. BlockingQueue

위 로직을 편리하게 실행할 수 있는 인터페이스가 바로 BlockingQueue이다. 생성 시 큐의 크기를 정할 수 있고, 큐가 가득 찼을 시의 생산자나 큐가 비어있을 시의 소비자와 같이 스레드가 대기해야 할 경우 어떤 일을 할 지 정할 수 있다.

  • Throwable Exception(예외): add(e), remove(), element()
  • Special Value(즉시 반환): offer(e), poll(), peek()
  • Blocks(대기): put(e), take()
  • Times Out(시간 대기): offer(e, time, unit), poll(time, unit)
public class BoundedQueueV6_1 implements BoundedQueue{

    private BlockingQueue<String> queue;

    public BoundedQueueV6_1(int max) {
        this.queue = new ArrayBlockingQueue<>(max);
    }


    @Override
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

예시 코드 중 하나이다. 생산자 스레드는 큐가 가득 찼다면 공간이 날 때까지, 소비자 스레드는 큐가 비어있다면 데이터가 생산될 때 까지 대기한다.

    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

구현체 중 하나인 ArrayBlockingQueue의 put 함수이다. null이 아닌 데이터 e를 받아 락을 획득한다. 큐가 가득 찼다면 notFull(생산자 대기 공간)에서 대기하다가 enqueue를 통해 데이터를 넣는다.

    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;
        count++;
        notEmpty.signal();
    }

enqueue 함수이다. 데이터를 넣고, 큐의 크기를 갱신한 후 notEmpty(소비자 대기 공간)에 signal을 날려주는 것을 확인할 수 있다. 전반적으로 위에서 ReentrantLock을 이용하여 구현한 로직과 유사하다.

여담)

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기");
            sleep(1000);
        }
        queue.offer(data);
    }

이 코드에서는 while문을 통해 스레드가 일정 시간마다 큐가 가득 찼는 지 확인한다. 이 과정에서 CPU 자원이 낭비될 우려가 있는 데 이를 busy-wait이라 한다.

'Java' 카테고리의 다른 글

[Java] 동시성 문제 해결하기  (0) 2024.09.03
Java Record  (0) 2024.08.31
Java volatile - 메모리 가시성  (0) 2024.08.29