JAVA/비동기 처리

JAVA 생산자 소비자 패턴, BlockingQueue

수달하나 2022. 12. 1. 11:30

BlockingQueue가 사용된 코드를 보면서 가장먼저 확인해 봐야 했던 부분은 생산자 소비자 패턴이다.

 

 

(모든 경우가 위와 같은 경우가 될 수는 없지만 일반적인 구조는 위와 같은 형태)

 

멀티 쓰레드 환경이나 혹은 다양한 부분에서 높은 비율의 공통 사용 로직이 있다고 가정할 때 과부하를 막기위해서 어떠한 조치를 해야한다. 이 문제해결을 위해서 나온 것이 바로 생산자 소비자 패턴인데 공통적으로 수행해야 할 Task를 중심으로 작업을 생산해 내는 생산자(Producer)작업을 처리하는 부분인 소비자(Consumer)의 형태로 나눠서 부하를 조절하겠다는것이고 생산된 작업 Task 는 Queue라는 공통 자원을 통해 관리, 생산자와 소비자의 역할을 분리시키자는 개념이다.

 

분리된 생성자와 소비자는 각각의 역할만 수행 할 뿐 전반적인 로직 자체에 관여하지 않기 때문에 부하를 줄일 수 있다는 큰 장점이 있다.

 

생성자들로부터 생성된 Task 를 Queue에 넣고 소비자들은 Queue에 쌓인 Task를 확인후 처리하게 된다.

그렇다면 왜 BlockingQueue 를 사용하는 것 일까?

아래코드는 실제 BlockingQueue를 이용한 생성자 소비자 패턴의 예시이다.

 

public class Producer implements Runnable{
    private final BlockingQueue queue;

    public Producer(BlockingQueue q){
        this.queue = q;
    }

    @Override
    public void run(){
        try {
            //TODO queue 에 Task 할당
        }catch (Exception e ){
            e.printStackTrace();
        }
    }
}

 

public class Consumer implements Runnable{
    private final BlockingQueue queue;

    public Consumer(BlockingQueue q){
        this.queue = q;
    }

    @Override
    public void run() {
        try{
            //TODO queue 로 부터 Task 수행
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

 

public static void main(String[] args) {

	BlockingQueue queue = (BlockingQueue) new Object();
	Producer producer = new Producer(queue);
	Consumer consumer = new Consumer(queue);
		
	new Thread(producer).start();
	new Thread(consumer).start();

}

 

위와 같은 BlockingQueue 는 공유자원으로 사용되며 java.util.concurrent 패키지에 있는 Queue를 상속받는 인터페이스 이다. 일반 Queue 같은 경우 멀티 쓰레드 영역에서의 동기화가 적용되지 않다보니 BlockingQueue를 통해서 멀티 쓰레드 영역의 동시적 처리를 대신하는 것인데 concurrent 는 동시에 발생하는 이라는 의미를 담고 있에 멀티 쓰레드와 같은 상황에서 많이 사용됨을 유추할 수 있을 뿐더러 실제 concurrent 패키지를 통해 확인해보면 BlockingQueue의 예시로 멀티 쓰레드 환경에서의 생성자 소비자 패턴 사용을 전형적인 시나리오로 얘기 하고 있다. 위에서 봤던 코드는 사실 라이브러리 자체에서 제공했던 예시 코드였다.

 

 

이런 BlockingQueue의 사용에 있어서도 소비자와 생산자의 구현 형태에 따라서 ConcurrentBlockingQueueLinkedBlockingQueue로 나눠서 사용 할 수 있는데 ConcurrentLinkedQueue의 경우 queue에 꺼낼 원소가 없다면 즉시 리턴하고 다른일을 수행하는 로직을 따르기 때문에 생산자가 하나이고 소비자가 많을 경우 사용하는 반면 LinkedBlockingQueue의 경우에는 큐에서 꺼내갈 원소가 없을 경우 wait 상태를 유지하고 queue 의 최대 용량을 정의할 수 있기 때문에 생산자가 많고 소비자가 하나일 경우에 사용하게 된다. 

 

일반적인 상태에 대한 부분이긴 하지만 사실 개발자의 구현 방법에 따라서 혹은 시스템의 특정 상황에 따라서 구현 방식이 달라져야 하는데 나는 처음 이 패턴을 확인하게 된 부분이 패킷을 분석하는 로직이었다.

 

실제 queue를 통해 쓰레드의 상태를 wait 시키거나 혹은 직접 함수를 호출하여 쓰레드를 sleep 상태로 유지하는 부분에서 차이가 있었지만 어떤 방식으로 시스템을 구현할 것인지에 대한 부분은 어떤 방법이 더 효율적으로 시스템을 설계할 것인지에 대한 개발자의 생각에 달려있기에 항상 잘 생각하며 로직을 구현하는 것이 필요한것 같다.