멀티 스레드를 이용하여 생산자-소비자 문제를 구현해보고 문제가 되는 부분을 해결해본다.
1. 생산자는 매 0.5마다 data chunk(char)를 큐에 넣는 것으로 데이터를 생산한다.
2. 소비자는 매번 마다 큐에 데이터가 있다면 자신의 데이터에 추가하고 업다면 1초 대기한다.
n 생산자 m 소비자를 여러 쓰레드로 구현한다면 공유 변수 문제는 다음과 같고 쉽게 해결할 수 있다.
- 공유 변수 : 큐를 접근할 때 하나의 쓰레만 읽고 쓸 수 있도록한다.
- mutex의 사용으로 해결
#include <iostream>
#include <mutex>
#include <vector>
#include <thread>
#include <chrono>
#include <string>
#include <time.h>
#include <queue>
using std::thread;
using std::queue;
using std::mutex;
using std::string;
#define PR_NUM 3
#define CS_NUM 5
char DataChunk()
{
return rand() % 128;
}
class Producer
{
private:
int _id = -1;
mutex* _mtx;
queue<char>* _shared_buffer;
static constexpr int _delay = 500; // 0.5초 마다 데이터를 생산
public:
Producer() = default;
Producer(mutex* mtx, queue<char>* shared_buffer)
: _mtx(mtx), _shared_buffer(shared_buffer)
{
}
void SetId(int id) { _id = id; }
void Produce()
{
static int i = 0;
while (i++ < 100)
{
_mtx->lock();
_shared_buffer->push(DataChunk());
_mtx->unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(_delay));
}
}
};
class Consumer
{
private:
int _id = -1;
mutex* _mtx;
queue<char>* _shared_buffer;
static constexpr int _delay = 1000; // 최대 1초 마다 데이터를 수신
string _data;
public:
Consumer() = default;
Consumer(mutex* mtx, queue<char>* shared_buffer)
: _mtx(mtx), _shared_buffer(shared_buffer)
{
}
void SetId(int id) { _id = id; }
void Consume()
{
while (_data.size() < 10)
{
_mtx->lock();
if (!_shared_buffer->empty())
{
char data = _shared_buffer->front();
_shared_buffer->pop();
_data.push_back(data);
printf("This Thread [%d] : %s\n", _id, _data.c_str());
_mtx->unlock();
continue;
}
_mtx->unlock();
// 데이터가 없다면 오래 쉬었다가 다시 확인한다.
std::this_thread::sleep_for(std::chrono::microseconds(_delay));
}
}
};
int main()
{
srand((unsigned int) time(NULL));
mutex* s_mtx = new mutex;
queue<char> *s_buffer = new queue<char>;
Producer* producers = new Producer[PR_NUM];
for (int i = 0; i < PR_NUM; ++i)
{
producers[i] = Producer(s_mtx, s_buffer);
}
Consumer* consumers = new Consumer[CS_NUM];
for (int i = 0; i < CS_NUM; ++i)
{
consumers[i] = Consumer(s_mtx, s_buffer);
}
thread pr_th[PR_NUM];
for (int i = 0; i < PR_NUM; ++i)
{
pr_th[i] = thread(&Producer::Produce, &producers[i]);
producers[i].SetId(i);
}
thread cs_th[CS_NUM];
for (int i = 0; i < CS_NUM; ++i)
{
cs_th[i] = thread(&Consumer::Consume, &consumers[i]);
consumers[i].SetId(i);
}
for (int i = 0; i < PR_NUM; ++i)
{
pr_th[i].join();
}
for (int i = 0; i < CS_NUM; ++i)
{
cs_th[i].join();
}
delete[] producers;
delete[] consumers;
delete s_buffer;
delete s_mtx;
return 0;
}
개선해야할 점
소비자는 큐에 데이터가 있는지 주기적으로 확인을 해야한다. 이를 생산자가 데이터를 생산했을 때를 알림 형식으로 알려주고 그 때만 데이터를 처리할 수 있게끔 한다면 쓰레드를 block하는 것 없이 효율적으로 바로 처리할 수 있을 것이다.
std::conditional_variable
조건 변수 cv 객체는 대기(block) 중인 쓰레드를 보관하여 특정 알림이 왔을 때 쓰레드를 wakeup하여 임계영역에 진입하도록 할 수 있다.
wait 함수
void wait(std::unique_lock<std::mutex>& lock);
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
wait함수는 특정 조건 Predicate() 함수를 만족할 때 루프를 반복하며 현재 실행 쓰레드를 대기 시키고 cv의 대기 큐에 넣는다. 다른 쓰레드의 cv가 notify()함수를 통해 자신의 쓰레드가 wakeup이 되었을 때 임계영역에 진입할 수 있게 한다.
따라서 조건이 추가된 wait은 다음과 같은 의미를 지닌다.
while(!pred())
{
wait(lock);
}
해당 조건이 false일 경우 wait은 lock 객체를 unlock하고 현재 쓰레드를 무한정 대기하게 된다.
wakeup되고 조건이 true에 해당되어 루프를 탈출 할 경우 lock 객체를 획득한다. 주의할 점은 이후에 또다시 lock을 한다면 terminate() 호출로 예외를 발생시킨다. 또한 쓰레드는 wait를 호출할 때 전달한 unique_lock 인스턴스가 현재 쓰레드를 lock 상태 이어야한다.
cv 객체를 이용한 생산자 소비자 문제
- 생산자
unique_lock은 defere_lock 인자를 통해 생성할 때 잠그지 않고 lock 타이밍을 결정할 수 있다.
void Produce()
{
std::unique_lock<mutex> lk(*_mtx, std::defer_lock);
static int i = 0;
while (i++ < 100)
{
lk.lock();
_shared_buffer->push(DataChunk());
lk.unlock();
_cv->notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(_delay));
}
printf("Produce [%d] ends\n", _id);
}
- 소비자
마찬가지로 잠금을 지연시켰다가 쓰레드가 wakeup을 하여 공유 변수를 처리하도록한다.
void Consume()
{
// 잠금을 지연시킨다.
std::unique_lock<mutex> lk(*_mtx, std::defer_lock);
while (true)
{
// 잠금 시작
lk.lock();
_cv->wait(lk, [this, &lk]() {
// false일 떄 루프가 반복된다.
return !_shared_buffer->empty() || _data.size() == 10;
});
char data = _shared_buffer->front();
_shared_buffer->pop();
_data.push_back(data);
printf("This Thread [%d] : %s\n", _id, _data.c_str());
lk.unlock();
if (_data.size() == 10)
return;
}
//// 데이터가 없다면 오래 쉬었다가 다시 확인한다.
//std::this_thread::sleep_for(std::chrono::microseconds(_delay));
}
'Computer Science 기본 지식 > 운영체제' 카테고리의 다른 글
[C++ Thread] 약속과 미래 객체, std::promise / std::future (0) | 2021.04.30 |
---|---|
[C++ Thread] memory order 와 atomic 객체 (0) | 2021.04.27 |
[C++ Thread] 공유 변수와 경쟁 조건 (0) | 2021.04.24 |
[C++ Thread] C++ Thread 관리하기 (0) | 2021.04.24 |
[C++ Thread] C++ Thread 생성하기 (0) | 2021.04.24 |