1. 개요
본 포스팅은 김영한 강사님의 인프런 강의 "자바 고급 1편" 중 생산자-소비자 문제에 대해 정리한 포스팅입니다.
2. 생산자-소비자 문제?
생산자 스레드와 소비자 스레드가 존재한다. 두 종류의 스레드는 critical section 내부의 큐에 접근하여 데이터를 역할에 맞춰 생산 및 소비하고자 한다. 하지만 큐의 길이는 한정되어 있으므로 다음의 문제가 발생할 수 있다.
- 생산자가 너무 빠른 생산을 하여 큐가 가득 찼을 경우 생산자 스레드는 큐에 빈 공간이 생길 때 까지 기다려야 한다.
- 소비자가 너무 빠른 소비를 하여 큐가 비었을 경우 소비자 스레드는 큐에 데이터가 들어올 때 까지 기다려야 한다.
이처럼 두 다른 역할을 하는 스레드가 특정 자원을 함께 생산하고, 소비하면서 발생하는 문제를 생산자-소비자 문제 또는 한정된 버퍼 문제라 한다.
3. 해결?
이 문제는 Lock과 ReentrantLock을 이용하여 다음과 같이 해결할 수 있다.
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV5 implements BoundedQueue{
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
consumerCond.signal();
}finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 생산자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
producerCond.signal();
return data;
}finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
이의 역할에 대해 하나씩 살펴보자.
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition();
private final Condition consumerCond = lock.newCondition();
데이터를 저장할 큐, 이의 크기 제한이 있고, 생산자 스레드가 대기할 대기 공간, 소비자 스레드가 대기할 대기 공간이 있다. Condition이 이 역할을 수행한다.
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, signal() 호출");
consumerCond.signal();
}finally {
lock.unlock();
}
}
생산자 스레드는 락을 획득하고, 데이터를 생산하여 큐에 집어넣기를 시도한다. 큐가 가득 차 있다면 await를 호출하여 생산자 스레드 전용 대기 공간에 보관한다. 데이터를 저장했다면 "소비자 대기 공간"에 signal을 날려 소비자 스레드를 깨운다.
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 생산자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, signal() 호출");
producerCond.signal();
return data;
}finally {
lock.unlock();
}
}
반대로 소비자 스레드는 큐가 비어있을 시 소비자 스레드 대기 공간에 대기하고 있다가 데이터 소비에 성공하면 생산자를 호출한다.

그림처럼 Condition이라 불리는 스레드 대기 공간이 있고, Lock에서 관리하는 락 대기공간이 있다. 스레드 대기 공간에서 깨어난 스레드는 락 획득을 시도하고, 다른 스레드가 사용중이어서 획득에 실패할 경우 WAITING 상태로 락 대기공간의 대기 큐에서 대기한다.
4. BlockingQueue
위 로직을 편리하게 실행할 수 있는 인터페이스가 바로 BlockingQueue이다. 생성 시 큐의 크기를 정할 수 있고, 큐가 가득 찼을 시의 생산자나 큐가 비어있을 시의 소비자와 같이 스레드가 대기해야 할 경우 어떤 일을 할 지 정할 수 있다.
- Throwable Exception(예외): add(e), remove(), element()
- Special Value(즉시 반환): offer(e), poll(), peek()
- Blocks(대기): put(e), take()
- Times Out(시간 대기): offer(e, time, unit), poll(time, unit)
public class BoundedQueueV6_1 implements BoundedQueue{
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
예시 코드 중 하나이다. 생산자 스레드는 큐가 가득 찼다면 공간이 날 때까지, 소비자 스레드는 큐가 비어있다면 데이터가 생산될 때 까지 대기한다.
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
구현체 중 하나인 ArrayBlockingQueue의 put 함수이다. null이 아닌 데이터 e를 받아 락을 획득한다. 큐가 가득 찼다면 notFull(생산자 대기 공간)에서 대기하다가 enqueue를 통해 데이터를 넣는다.
private void enqueue(E e) {
// assert lock.isHeldByCurrentThread();
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
enqueue 함수이다. 데이터를 넣고, 큐의 크기를 갱신한 후 notEmpty(소비자 대기 공간)에 signal을 날려주는 것을 확인할 수 있다. 전반적으로 위에서 ReentrantLock을 이용하여 구현한 로직과 유사하다.
여담)
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
sleep(1000);
}
queue.offer(data);
}
이 코드에서는 while문을 통해 스레드가 일정 시간마다 큐가 가득 찼는 지 확인한다. 이 과정에서 CPU 자원이 낭비될 우려가 있는 데 이를 busy-wait이라 한다.
'Java' 카테고리의 다른 글
| [Java] 동시성 문제 해결하기 (0) | 2024.09.03 |
|---|---|
| Java Record (0) | 2024.08.31 |
| Java volatile - 메모리 가시성 (0) | 2024.08.29 |