Computer Science 기본 지식/운영체제

[C++ Thread] 생산자 소비자 문제

로파이 2021. 4. 24. 19:02

멀티 스레드를 이용하여 생산자-소비자 문제를 구현해보고 문제가 되는 부분을 해결해본다.

1. 생산자는 매 0.5마다 data chunk(char)를 큐에 넣는 것으로 데이터를 생산한다.

2. 소비자는 매번 마다 큐에 데이터가 있다면 자신의 데이터에 추가하고 업다면 1초 대기한다.

 

n 생산자 m 소비자를 여러 쓰레드로 구현한다면 공유 변수 문제는 다음과 같고 쉽게 해결할 수 있다.

- 공유 변수 : 큐를 접근할 때 하나의 쓰레만 읽고 쓸 수 있도록한다.

- mutex의 사용으로 해결

#include <iostream>
#include <mutex>
#include <vector>
#include <thread>
#include <chrono>
#include <string>
#include <time.h>
#include <queue>
using std::thread;
using std::queue;
using std::mutex;
using std::string;

#define PR_NUM 3
#define CS_NUM 5

char DataChunk()
{
	return rand() % 128;
}
class Producer
{
private:
	int _id = -1;
	mutex* _mtx;
	queue<char>* _shared_buffer;
	static constexpr int _delay = 500; // 0.5초 마다 데이터를 생산
public:
	Producer() = default;
	Producer(mutex* mtx, queue<char>* shared_buffer) 
		: _mtx(mtx), _shared_buffer(shared_buffer) 
	{
	}
	void SetId(int id) { _id = id; }
	void Produce()
	{
		static int i = 0;
		while (i++ < 100)
		{
			_mtx->lock();
			_shared_buffer->push(DataChunk());
			_mtx->unlock();
			std::this_thread::sleep_for(std::chrono::milliseconds(_delay));
		}
	}
};
class Consumer
{
private:
	int _id = -1;
	mutex* _mtx;
	queue<char>* _shared_buffer;
	static constexpr int _delay = 1000; // 최대 1초 마다 데이터를 수신
	string _data;
public:
	Consumer() = default;
	Consumer(mutex* mtx, queue<char>* shared_buffer)
		: _mtx(mtx), _shared_buffer(shared_buffer)
	{
	}
	void SetId(int id) { _id = id; }
	void Consume()
	{
		while (_data.size() < 10)
		{
			_mtx->lock();
			if (!_shared_buffer->empty())
			{
				char data = _shared_buffer->front();
				_shared_buffer->pop();
				_data.push_back(data);
				printf("This Thread [%d] : %s\n", _id, _data.c_str());
				_mtx->unlock();
				continue;
			}
			_mtx->unlock();

			// 데이터가 없다면 오래 쉬었다가 다시 확인한다.
			std::this_thread::sleep_for(std::chrono::microseconds(_delay));
		}
	}
};
int main()
{
	srand((unsigned int) time(NULL));
	mutex* s_mtx = new mutex;
	queue<char> *s_buffer = new queue<char>;

	Producer* producers = new Producer[PR_NUM];
	for (int i = 0; i < PR_NUM; ++i)
	{
		producers[i] = Producer(s_mtx, s_buffer);
	}
	Consumer* consumers = new Consumer[CS_NUM];
	for (int i = 0; i < CS_NUM; ++i)
	{
		consumers[i] = Consumer(s_mtx, s_buffer);
	}

	thread pr_th[PR_NUM];
	for (int i = 0; i < PR_NUM; ++i)
	{
		pr_th[i] = thread(&Producer::Produce, &producers[i]);
		producers[i].SetId(i);
	}

	thread cs_th[CS_NUM];
	for (int i = 0; i < CS_NUM; ++i)
	{
		cs_th[i] = thread(&Consumer::Consume, &consumers[i]);
		consumers[i].SetId(i);
	}

	for (int i = 0; i < PR_NUM; ++i)
	{
		pr_th[i].join();
	}

	for (int i = 0; i < CS_NUM; ++i)
	{
		cs_th[i].join();
	}

	delete[] producers;
	delete[] consumers;
	delete s_buffer;
	delete s_mtx;
	return 0;
}

 

개선해야할 점

소비자는 큐에 데이터가 있는지 주기적으로 확인을 해야한다. 이를 생산자가 데이터를 생산했을 때를 알림 형식으로 알려주고 그 때만 데이터를 처리할 수 있게끔 한다면 쓰레드를 block하는 것 없이 효율적으로 바로 처리할 수 있을 것이다.

 

std::conditional_variable

조건 변수 cv 객체는 대기(block) 중인 쓰레드를 보관하여 특정 알림이 왔을 때 쓰레드를 wakeup하여 임계영역에 진입하도록 할 수 있다.

 

wait 함수

void wait(std::unique_lock<std::mutex>& lock);
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

wait함수는 특정 조건 Predicate() 함수를 만족할 때 루프를 반복하며 현재 실행 쓰레드를 대기 시키고 cv의 대기 큐에 넣는다. 다른 쓰레드의 cv가 notify()함수를 통해 자신의 쓰레드가 wakeup이 되었을 때 임계영역에 진입할 수 있게 한다.

따라서 조건이 추가된 wait은 다음과 같은 의미를 지닌다.

while(!pred())
{
	wait(lock);
}

해당 조건이 false일 경우 wait은 lock 객체를 unlock하고 현재 쓰레드를 무한정 대기하게 된다.

wakeup되고 조건이 true에 해당되어 루프를 탈출 할 경우 lock 객체를 획득한다. 주의할 점은 이후에 또다시 lock을 한다면 terminate() 호출로 예외를 발생시킨다. 또한 쓰레드는 wait를 호출할 때 전달한 unique_lock 인스턴스가 현재 쓰레드를 lock 상태 이어야한다.

 

cv 객체를 이용한 생산자 소비자 문제

- 생산자

unique_lock은 defere_lock 인자를 통해 생성할 때 잠그지 않고 lock 타이밍을 결정할 수 있다.

	void Produce()
	{
		std::unique_lock<mutex> lk(*_mtx, std::defer_lock);
		static int i = 0;
		while (i++ < 100)
		{
			lk.lock();
			_shared_buffer->push(DataChunk());
			lk.unlock();

			_cv->notify_one();
			std::this_thread::sleep_for(std::chrono::milliseconds(_delay));
		}
		printf("Produce [%d] ends\n", _id);
	}

 

- 소비자

마찬가지로 잠금을 지연시켰다가 쓰레드가 wakeup을 하여 공유 변수를 처리하도록한다.

	void Consume()
	{
		// 잠금을 지연시킨다.
		std::unique_lock<mutex> lk(*_mtx, std::defer_lock);
		
		while (true)
		{
			// 잠금 시작
			lk.lock();

			_cv->wait(lk, [this, &lk]() {
				// false일 떄 루프가 반복된다.
				return !_shared_buffer->empty() || _data.size() == 10;
			});

			char data = _shared_buffer->front();
			_shared_buffer->pop();
			_data.push_back(data);

			printf("This Thread [%d] : %s\n", _id, _data.c_str());

			lk.unlock();

			if (_data.size() == 10)
				return;
		}
		
		//// 데이터가 없다면 오래 쉬었다가 다시 확인한다.
		//std::this_thread::sleep_for(std::chrono::microseconds(_delay));
	}