은행 창구에서 고객들을 맞이하는 시스템을 구현하고 시뮬레이션 해본다.
다음 요구사항을 만족하는 프로그램을 작성한다.
1. 은행 손님들은 임의의 시간에 은행에 들어와 대기표를 발급 받는다.
2. 대기표를 받은 순서대로 빈 창구 있는 경우 바로 서비스를 받을 수 있게 한다.
3. 각 창구에서 일하고 있는 은행원은 은행 업무 서비스를 제공하고 대기 번호가 하나라도 있는 경우 업무를 맡을 수 있게 한다.
4. 각 창구는 한 명의 은행원이 맡으며 총 은행원의 수는 정해져 있다.
5. 이제 대기표 발급과 창구로의 은행 업무 할당과 관련된 로직을 설계하고 다중 스레드로 시뮬레이션 해본다.
배경
동기화 Synchronization
- 대기표
은행에 방문한 손님은 다중 스레드로 표현되는 임의의 시점에서 동시에 대기표를 발급 받을 수 있다. 대기표는 각 손님에게 유니크한 정수형 번호를 발급해야하며 같은 번호가 둘 이상의 손님에서 발급되거나 건너뛰어서 발급되면 안된다.
- 대기 리스트
각 창구에 매칭되는 손님은 한 명이어야한다. 대기 번호로 줄 지어진 리스트(큐)가 있다면 그러한 리스트에서 다음 배정받기 위해 꺼내지거나 새로운 대기번호가 큐잉되기 위해서는 동기화되어야한다.
- 로그
프로그램이 동작하는 동안 시뮬레이션의 결과를 중간 중간 확인할 수 있도록 스트링 메시지를 저장해둔다. 이 또한 여러 스레드에서 접근가능하므로 스레드 안전한 로깅을 제공해야한다.
위와 같은 동기화를 위해 std::mutex와 std::lock_guard를 이용하여 적절히 구현하면 된다.
생산자-소비자 문제 Producer-Consumer Problem
위 문제의 경우 생산자-소비자 문제와 비슷하다. 손님은 대기표를 받아 처리해야할 제품을 만드는 생산자 역할이고 은행원은 각 대기표를 받아 은행 업무를 처리하는 소비자 역할이다.
생산자-소비자 문제를 구현하기 위해 각 은행원의 행동은 스레드 풀로 만들어준다. 스레드 풀으로 표현되는 각 은행원들은 자신의 스레드에서 받을 수 있는 대기 번호가 생길때까지 블록(block) 상태에 놓인다.
자신의 창구가 비어있는 상태이고 대기 리스트에 번호가 있다면 자신의 스레드를 꺠우고 맨 앞 번호를 꺼내어 은행 업무를 처리하도록 한다.
std::unique_lock와 std::condition_variable
<mutex> 헤더에 정의되어 있는 unique_lock을 보자.
// CLASS TEMPLATE unique_lock
template <class _Mutex>
class unique_lock { // whizzy class with destructor that unlocks mutex
public:
using mutex_type = _Mutex;
// CONSTRUCT, ASSIGN, AND DESTROY
unique_lock() noexcept : _Pmtx(nullptr), _Owns(false) {}
_NODISCARD_CTOR explicit unique_lock(_Mutex& _Mtx)
: _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // construct and lock
_Pmtx->lock();
_Owns = true;
}
explicit 생성자로 정의된 unique_lock의 경우 mutex를 레퍼런스로 받아 잠금을 수행한 뒤 객체를 생성한다.
락 가드(std::lock_guard)와 같이 소멸자에서 소유 중인 경우 잠금 해제를 한다.
~unique_lock() noexcept {
if (_Owns) {
_Pmtx->unlock();
}
}
락 가드와 다른 특징은 유니크 락을 통해 획득한 잠금을 도중에 풀거나 다시 잠글 수 있다는 점이다. 또한 try_lock이나 try_lock_for 등 블록없는 잠금 시도 또한 가능하다.
std::condition_variable 조건 변수
생산자-소비자 프로그램에서 소비자에게 처리할 product가 도착했다는 신호(signal)를 알리기 위해 사용한다. 유니크 락은 조건 변수의 wait() 함수가 유니크 락을 인자로 받기 때문에 유니크 락을 사용해야하는 이유이기도 하다.
void wait(unique_lock<mutex>& _Lck) { // wait for signal
// Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}
template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
while (!_Pred()) {
wait(_Lck);
}
}
주로 사용되는 버전은 아래 함수처럼 Predicate 함수를 하나 받는 버전인데 조건 변수는 이상적으로는 notify_one()/notify_all()과 같은 알림에 의해 깨어나야 하지만 가짜 알림(spurious wakeup)이 일어날 수도 있다고 언급되어 있다. 따라서 실제 깨어난 스레드가 정말로 일어나야하는 상황인 지 체크하기 위한 이유도 있다.
std::mutex mtx = {};
std::condition_variable cv = {};
bool Signaled = false;
void BlockThisThreadUntilWakeup()
{
// 락을 획득한다.
std::unique_lock<std::mutex> lk(mtx);
// Predicate가 false인 경우 획득한 락을 해제하고 스레드를 블락한다.
cv.wait(lk, [&]() { return Signaled });
// 이후 영역은 조건 변수에 의해 mtx 뮤텍스 객체가 락을 획득한 상태로 진행된다.
// 유니크 락을 통해 락을 해제할 수 있다.
lk.unlock();
}
조건 변수의 wait()을 이용하면 Predicate 조건을 검사하여 깨어나는 상황이 아닌 경우 락을 풀고 스레드를 블록시킨다.
만약 정상적으로 깨어났다면, 유니크 락을 통해 락을 획득한 상태가 되고 이후 영역에서 필요하다면 다시 풀 수도 있다.
조건 변수 사용 주의 사항
Predicate를 사용하는 주된 이유는 따로 있는데, Predicate가 없다면 조건 변수는 반드시 wait -> signal 순서로 일어나야 알림을 받을 수 있다.
static void UnNotified()
{
std::mutex mtx = {};
std::condition_variable cv = {};
const auto wait_thread = [&]()
{
std::unique_lock<std::mutex> lk(mtx);
std::cout << "Wait" << std::endl;
cv.wait(lk);
std::cout << "Signal" << std::endl;
};
const auto signal_thread = [&]()
{
cv.notify_one();
std::cout << "Notify" << std::endl;
};
;
std::thread t1(signal_thread);
std::this_thread::sleep_for(1000ms);
std::thread t2(wait_thread);
std::this_thread::sleep_for(1000ms);
t1.join();
t2.join();
}
위 함수를 실행하면 문제는 Notify가 먼저 나오는 경우 Signal 결과를 영원히 받을 수 없다는 것이다.
조건 변수의 wait()의 함수 정의를 다시 살펴보면 락을 획득한 상태에서 while(!Pred())로 한번 조건을 검사한다는 것이다. 따라서 깨어날 상황이 이미 만족되어 있다면 조건 변수가 락을 해제하고 스레드를 블록시키는 wait(_Lck)를 실행하지 않는다.
Predicate를 이용한 wait() 버전
static void Notified()
{
std::mutex mtx = {};
std::condition_variable cv = {};
bool signaled = false;
const auto wait_thread = [&]()
{
std::unique_lock<std::mutex> lk(mtx);
std::cout << "Wait" << std::endl;
cv.wait(lk, [&]() { return signaled; });
std::cout << "Signal" << std::endl;
};
const auto signal_thread = [&]()
{
{
std::lock_guard<std::mutex> sync(mtx);
signaled = true;
}
cv.notify_one();
std::cout << "Notify" << std::endl;
};
;
std::thread t1(signal_thread);
std::this_thread::sleep_for(1000ms);
std::thread t2(wait_thread);
std::this_thread::sleep_for(1000ms);
t1.join();
t2.join();
}
Notifed 버전에서 signal을 주는 스레드에서 bool signaled 변수 역시 동기화로 보호되는 것에 유의한다. 이렇게 한다면 Pred를 실행할 때 signaled 변수가 바뀔 일이 없다는 것을 보장한다.
생산자-소비자 패턴
유니크 락과 조건 변수를 활용하여 보이는 최종 생산자-소비자 코드 패턴은 다음과 같다.
std::mutex mtx = {};
std::condition_variable cv = {};
// 생산자 패턴
void Produce(int product)
{
const auto queue = [&]()
{
std::lock_guard<std::mutex> sync(mtx);
waiting_list.push(product);
};
queue();
cv.notify_one();
}
// 소비자 패턴
void Consume()
{
while (!Exit)
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, []() { return Exit || waiting_list.size() > 0; })
if (Exit)
return;
// do something
}
}
Helper 클래스 만들기
AutoResetEvent 클래스
구글을 검색하여 C#의 AutoResetEvent를 C++ 표준 라이브러리를 이용하여 구현한 것을 사용한다.
위 클래스는 시뮬레이션 중 작업을 큐하고 마칠 때 알람을 받기 위해 편의 기능으로 사용한다.
https://docs.microsoft.com/en-us/dotnet/api/system.threading.autoresetevent?view=net-6.0
#pragma once
#ifndef _AUTORESETEVENT_H_
#define _AUTORESETEVENT_H_
#include <mutex>
#include <condition_variable>
#include <thread>
#include <stdio.h>
class AutoResetEvent
{
public:
explicit AutoResetEvent(bool initial = false)
: Flag(initial)
{
}
void Set()
{
std::lock_guard<std::mutex> _(SyncRoot);
Flag = true;
Signal.notify_one();
}
void Reset()
{
std::lock_guard<std::mutex> _(SyncRoot);
Flag = false;
}
void WaitOne()
{
std::unique_lock<std::mutex> lk(SyncRoot);
Signal.wait(lk, [this]() { return Flag; });
Flag = false; // waiting resets the flag
}
bool WaitOne(int milliseconds)
{
std::unique_lock<std::mutex> lk(SyncRoot);
bool signaled = Signal.wait_for(lk, std::chrono::milliseconds(milliseconds), [this]() { return Flag; });
if (signaled == false)
return false;
Flag = false; // waiting resets the flag
return true;
}
private:
AutoResetEvent(const AutoResetEvent&);
AutoResetEvent& operator=(const AutoResetEvent&) = delete; // non-copyable
bool Flag;
std::mutex SyncRoot;
std::condition_variable Signal;
};
#endif // !_AUTORESETEVENT_H_
ISingleton 싱글톤 패턴
싱글톤 패턴을 상속하여 구현하는 인터페이스이다.
#pragma once
#include <memory>
#ifndef _Singleton_H_
#define _Singleton_H_
template<typename T>
class ISingleton
{
private:
static std::unique_ptr<T> Instance;
protected:
ISingleton() = default;
public:
virtual ~ISingleton() = default;
ISingleton(const ISingleton&) = delete;
ISingleton& operator=(const ISingleton&) = delete;
static T* Get()
{
if (Instance == nullptr)
{
static std::once_flag InitFlag = {};
std::call_once(InitFlag, []() { Instance.reset(new T()); });
}
return Instance.get();
}
};
template<typename T>
std::unique_ptr<T> ISingleton<T>::Instance = {};
#endif // ! _Singleton_H_
Random 클래스
성능 고려 없이 필요 함수만 구현한 C#의 Random을 모방한 클래스이다.
#pragma once
#ifndef _RANDOM_H_
#define _RANDOM_H_
#include <iostream>
#include <cstdio>
#include <random>
#include <array>
#include <assert.h>
#include <memory.h>
class Random
{
private:
class RandomDevice
{
private:
std::random_device device;
std::unique_ptr<std::seed_seq> seeds;
public:
RandomDevice()
{
auto seed_data = std::array<int, std::mt19937::state_size>{};
std::generate(std::begin(seed_data), std::end(seed_data), std::ref(device));
seeds = std::make_unique<std::seed_seq>(std::begin(seed_data), std::end(seed_data));
}
std::mt19937 Generate()
{
return std::mt19937(*seeds);
}
};
private:
std::mt19937 engine;
std::uniform_real_distribution<float> sample_dist{ 0.f, 1.f };
public:
Random()
{
static RandomDevice device;
engine = device.Generate();
}
int Next(int maxValue)
{
std::uniform_int_distribution<int> int_dist(0, maxValue - 1);
return int_dist(engine);
}
int Next(int minValue, int maxValue)
{
assert(minValue < maxValue);
std::uniform_int_distribution<int> int_dist(minValue, maxValue - 1);
return int_dist(engine);
}
int NextBool()
{
return sample_dist(engine) > 0.5f;
}
float Sample()
{
return sample_dist(engine);
}
template<class Iter>
void Shuffle(Iter first, Iter last)
{
std::shuffle(first, last, engine);
}
};
#endif // ! _RANDOM_H_
은행 창구 시뮬레이션 프로그램 코드
#pragma once
#ifndef _CUSTOMERSERVICE_H_
#define _CUSTOMERSERVICE_H_
#include "define.h"
#include "ISingleton.h"
#include "AutoResetEvent.h"
using namespace std::chrono_literals;
class CustomerService : public ISingleton<CustomerService>
{
friend class ISingleton<CustomerService>;
using EventObject = std::shared_ptr<AutoResetEvent>;
private:
class Request
{
private:
enum class RequestStatus
{
Created,
Running,
Finished
};
RequestStatus status;
EventObject eventObj;
int waitIdx;
public:
Request(int _waitIdx, const EventObject& _eventObj)
:
status(RequestStatus::Created),
eventObj(_eventObj),
waitIdx(_waitIdx)
{}
int GetIdx() const { return waitIdx; }
void Start()
{
status = RequestStatus::Running;
// eventObj->Reset();
}
void End()
{
status = RequestStatus::Finished;
eventObj->Set();
}
void Wait()
{
eventObj->WaitOne();
}
};
class WaitingLine
{
private:
std::mutex SyncRoot;
std::queue<int> Waitings;
public:
void Push(int waitIdx)
{
std::lock_guard<std::mutex> _(SyncRoot);
Waitings.push(waitIdx);
}
bool TryPop(int& waitIdx)
{
std::lock_guard<std::mutex> _(SyncRoot);
if (Waitings.empty())
return false;
waitIdx = Waitings.front();
Waitings.pop();
return true;
}
};
private:
constexpr static int NumOfWorkers = 5;
bool ExitFlag = false;
int WaitIndex = 0;
std::mutex SyncLog;
std::mutex SyncTicket;
std::mutex SyncTask;
std::condition_variable SignalTask;
std::vector<std::thread> Workers;
std::queue<int> Waitings;
std::queue<Request> Requests;
std::vector<std::string> Logs;
protected:
CustomerService()
{
for (int workerIdx = 0; workerIdx < NumOfWorkers; ++workerIdx)
{
Workers.emplace_back([this, workerIdx]() { Execute(workerIdx); });
}
std::this_thread::sleep_for(1s);
}
void Execute(int workerIdx)
{
Random rnd = {};
while (!ExitFlag)
{
std::unique_lock<std::mutex> lk(SyncTask);
SignalTask.wait(lk, [this] { return ExitFlag || Requests.size() > 0; });
if (ExitFlag)
return;
// Where is your wait ticket?
auto request = std::move(Requests.front());
Requests.pop();
lk.unlock();
// How can I help you?
request.Start();
auto takeTime = rnd.Next(100, 200);
std::this_thread::sleep_for(std::chrono::milliseconds(takeTime));
// Bye
request.End();
Log(Format("[Served] TicketId : {0} From {1} Window\n", request.GetIdx(), workerIdx));
}
}
int GenerateTicket()
{
std::lock_guard<std::mutex> sync(SyncTicket);
int ticketed = WaitIndex++;
Waitings.push(ticketed);
return ticketed;
}
bool CheckTicket(const int& waitIdx)
{
std::lock_guard<std::mutex> sync(SyncTicket);
if (Waitings.empty())
{
return false;
}
if (Waitings.front() != waitIdx)
{
return false;
}
Waitings.pop();
return true;
}
void WaitForService(const int& waitIdx, const EventObject& eventObj)
{
const auto makeRequest = [&]()
{
std::lock_guard<std::mutex> _(SyncTask);
return Requests.emplace(waitIdx, eventObj);
};
auto request = makeRequest();
Log(Format("[Waiting] TicketId : {0}\n", waitIdx));
SignalTask.notify_one();
request.Wait();
}
private:
void Log(std::string log)
{
std::lock_guard<std::mutex> sync(SyncLog);
Logs.push_back(std::move(log));
}
int GetTicket(int threadId)
{
int ticketed = GenerateTicket();
Log(Format("[Ticketed] TicketId : {0} Thread : {1}\n", ticketed, threadId));
return ticketed;
}
bool TryGetService(int waitIdx, const EventObject& eventObj)
{
if (CheckTicket(waitIdx) == false)
return false;
WaitForService(waitIdx, eventObj);
return true;
}
public:
virtual ~CustomerService()
{
ExitFlag = true;
SignalTask.notify_all();
for (auto& worker : Workers)
{
worker.join();
}
}
std::vector<std::string> Flush()
{
std::lock_guard<std::mutex> sync(SyncLog);
return std::move(Logs);
}
void Simulate(int numCustomers, int numOfConcurrency)
{
int eachCustomers = numCustomers / numOfConcurrency;
int remainCustomers = numCustomers - eachCustomers * (numOfConcurrency - 1);
std::vector<std::thread> ticketingThreads;
std::vector<std::thread> waitingThreads;
std::vector<std::shared_ptr<WaitingLine>> waitingLines;
for (int threadId = 0; threadId < numOfConcurrency; ++threadId)
{
auto waitingLine = std::make_shared<WaitingLine>();
waitingLines.push_back(waitingLine);
int numOfAssignedCustomers = threadId == numOfConcurrency - 1 ? remainCustomers : eachCustomers;
ticketingThreads.emplace_back([this, numOfAssignedCustomers, waitingLine, threadId]()
{
Random rnd = {};
for (int progress = 0; progress < numOfAssignedCustomers; ++progress)
{
waitingLine->Push(GetTicket(threadId));
auto duration = rnd.Next(25, 75);
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
}
});
waitingThreads.emplace_back([this, numOfAssignedCustomers, waitingLine]()
{
std::shared_ptr<AutoResetEvent> eventObj = std::make_shared<AutoResetEvent>();
for (int progress = 0, waitId = -1; progress < numOfAssignedCustomers;)
{
if (waitId == -1 && waitingLine->TryPop(waitId) == false)
{
std::this_thread::sleep_for(50ms);
continue;
}
if (TryGetService(waitId, eventObj) == false)
{
std::this_thread::sleep_for(50ms);
continue;
}
++progress;
waitId = -1;
}
});
}
for (auto& waitThread : waitingThreads)
{
waitThread.join();
}
for (auto& ticketingThread : ticketingThreads)
{
ticketingThread.join();
}
}
};
#endif // !_CUSTOMERSERVICE_H_
실행 결과
int main()
{
CustomerService::Get()->Simulate(10, 3);
auto results = CustomerService::Get()->Flush();
for (auto& result : results)
{
std::cout << result;
}
return 0;
}
참고: 모던 C++ 챌린지(마리우스 반실라 지음)에 수록된 문제
'Advanced C++' 카테고리의 다른 글
[C++] libcurl (2) 공개 API를 사용해보자 : HTTP GET (1) | 2022.04.17 |
---|---|
[C++] libcurl (1) 다운로드 및 설치 (0) | 2022.04.17 |
[C++] C++20 동시성 컨테이너를 사용하여 ThreadPool 설계하기 (0) | 2022.03.26 |
[C++] 정규식 표현 std::regex으로 문자열 찾기 (0) | 2022.02.07 |
[C++] 메모리 관리 (4) 정적 메모리 풀 (0) | 2021.10.10 |