Advanced C++

[C++] 멀티 스레드 응용 프로그램 : 은행 창구 시스템 구현하기

로파이 2022. 3. 27. 22:19

은행 창구에서 고객들을 맞이하는 시스템을 구현하고 시뮬레이션 해본다.

다음 요구사항을 만족하는 프로그램을 작성한다.
1. 은행 손님들은 임의의 시간에 은행에 들어와 대기표를 발급 받는다.

2. 대기표를 받은 순서대로 빈 창구 있는 경우 바로 서비스를 받을 수 있게 한다.

3. 각 창구에서 일하고 있는 은행원은 은행 업무 서비스를 제공하고 대기 번호가 하나라도 있는 경우 업무를 맡을 수 있게 한다.

4. 각 창구는 한 명의 은행원이 맡으며 총 은행원의 수는 정해져 있다.

5. 이제 대기표 발급과 창구로의 은행 업무 할당과 관련된 로직을 설계하고 다중 스레드로 시뮬레이션 해본다.

 

배경

동기화 Synchronization

- 대기표

은행에 방문한 손님은 다중 스레드로 표현되는 임의의 시점에서 동시에 대기표를 발급 받을 수 있다. 대기표는 각 손님에게 유니크한 정수형 번호를 발급해야하며 같은 번호가 둘 이상의 손님에서 발급되거나 건너뛰어서 발급되면 안된다.

 

- 대기 리스트

각 창구에 매칭되는 손님은 한 명이어야한다. 대기 번호로 줄 지어진 리스트(큐)가 있다면 그러한 리스트에서 다음 배정받기 위해 꺼내지거나 새로운 대기번호가 큐잉되기 위해서는 동기화되어야한다.

 

- 로그

프로그램이 동작하는 동안 시뮬레이션의 결과를 중간 중간 확인할 수 있도록 스트링 메시지를 저장해둔다. 이 또한 여러 스레드에서 접근가능하므로 스레드 안전한 로깅을 제공해야한다.

 

위와 같은 동기화를 위해 std::mutex와 std::lock_guard를 이용하여 적절히 구현하면 된다.

 

생산자-소비자 문제 Producer-Consumer Problem

위 문제의 경우 생산자-소비자 문제와 비슷하다. 손님은 대기표를 받아 처리해야할 제품을 만드는 생산자 역할이고 은행원은 각 대기표를 받아 은행 업무를 처리하는 소비자 역할이다.

 

생산자-소비자 문제를 구현하기 위해 각 은행원의 행동은 스레드 풀로 만들어준다. 스레드 풀으로 표현되는 각 은행원들은 자신의 스레드에서 받을 수 있는 대기 번호가 생길때까지 블록(block) 상태에 놓인다.

 

자신의 창구가 비어있는 상태이고 대기 리스트에 번호가 있다면 자신의 스레드를 꺠우고 맨 앞 번호를 꺼내어 은행 업무를 처리하도록 한다.

 

std::unique_lock와 std::condition_variable
<mutex> 헤더에 정의되어 있는 unique_lock을 보자.

// CLASS TEMPLATE unique_lock
template <class _Mutex>
class unique_lock { // whizzy class with destructor that unlocks mutex
public:
    using mutex_type = _Mutex;

    // CONSTRUCT, ASSIGN, AND DESTROY
    unique_lock() noexcept : _Pmtx(nullptr), _Owns(false) {}

    _NODISCARD_CTOR explicit unique_lock(_Mutex& _Mtx)
        : _Pmtx(_STD addressof(_Mtx)), _Owns(false) { // construct and lock
        _Pmtx->lock();
        _Owns = true;
    }

explicit 생성자로 정의된 unique_lock의 경우 mutex를 레퍼런스로 받아 잠금을 수행한 뒤 객체를 생성한다.

락 가드(std::lock_guard)와 같이 소멸자에서 소유 중인 경우 잠금 해제를 한다. 

 ~unique_lock() noexcept {
        if (_Owns) {
            _Pmtx->unlock();
        }
    }

락 가드와 다른 특징은 유니크 락을 통해 획득한 잠금을 도중에 풀거나 다시 잠글 수 있다는 점이다. 또한 try_lock이나 try_lock_for 등 블록없는 잠금 시도 또한 가능하다.


std::condition_variable 조건 변수
생산자-소비자 프로그램에서 소비자에게 처리할 product가 도착했다는 신호(signal)를 알리기 위해 사용한다. 유니크 락은 조건 변수의 wait() 함수가 유니크 락을 인자로 받기 때문에 유니크 락을 사용해야하는 이유이기도 하다.

void wait(unique_lock<mutex>& _Lck) { // wait for signal
        // Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
        _Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
    }

template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
    while (!_Pred()) {
        wait(_Lck);
    }
}

주로 사용되는 버전은 아래 함수처럼 Predicate 함수를 하나 받는 버전인데 조건 변수는 이상적으로는 notify_one()/notify_all()과 같은 알림에 의해 깨어나야 하지만 가짜 알림(spurious wakeup)이 일어날 수도 있다고 언급되어 있다. 따라서 실제 깨어난 스레드가 정말로 일어나야하는 상황인 지 체크하기 위한 이유도 있다.

std::mutex mtx = {};
std::condition_variable cv = {};
bool Signaled = false;

void BlockThisThreadUntilWakeup()
{
    // 락을 획득한다.
    std::unique_lock<std::mutex> lk(mtx);
	
    // Predicate가 false인 경우 획득한 락을 해제하고 스레드를 블락한다.
    cv.wait(lk, [&]() { return Signaled });
    
    // 이후 영역은 조건 변수에 의해 mtx 뮤텍스 객체가 락을 획득한 상태로 진행된다.
    // 유니크 락을 통해 락을 해제할 수 있다.
    lk.unlock();
}

조건 변수의 wait()을 이용하면 Predicate 조건을 검사하여 깨어나는 상황이 아닌 경우 락을 풀고 스레드를 블록시킨다. 
만약 정상적으로 깨어났다면, 유니크 락을 통해 락을 획득한 상태가 되고 이후 영역에서 필요하다면 다시 풀 수도 있다.

 

조건 변수 사용 주의 사항

Predicate를 사용하는 주된 이유는 따로 있는데, Predicate가 없다면 조건 변수는 반드시 wait -> signal 순서로 일어나야 알림을 받을 수 있다.

static void UnNotified()
{
	std::mutex mtx = {};
	std::condition_variable cv = {};

	const auto wait_thread = [&]()
	{
		std::unique_lock<std::mutex> lk(mtx);
		std::cout << "Wait" << std::endl;

		cv.wait(lk);

		std::cout << "Signal" << std::endl;
	};

	const auto signal_thread = [&]()
	{
		cv.notify_one();

		std::cout << "Notify" << std::endl;
	};
;

	std::thread t1(signal_thread);

	std::this_thread::sleep_for(1000ms);

	std::thread t2(wait_thread);

	std::this_thread::sleep_for(1000ms);

	t1.join();
	t2.join();
}

위 함수를 실행하면 문제는 Notify가 먼저 나오는 경우 Signal 결과를 영원히 받을 수 없다는 것이다.
조건 변수의 wait()의 함수 정의를 다시 살펴보면 락을 획득한 상태에서 while(!Pred())로 한번 조건을 검사한다는 것이다. 따라서 깨어날 상황이 이미 만족되어 있다면 조건 변수가 락을 해제하고 스레드를 블록시키는 wait(_Lck)를 실행하지 않는다.

Predicate를 이용한 wait() 버전

static void Notified()
{
	std::mutex mtx = {};
	std::condition_variable cv = {};
	bool signaled = false;

	const auto wait_thread = [&]()
	{
		std::unique_lock<std::mutex> lk(mtx);
		std::cout << "Wait" << std::endl;

		cv.wait(lk, [&]() { return signaled; });

		std::cout << "Signal" << std::endl;
	};

	const auto signal_thread = [&]()
	{
		{
			std::lock_guard<std::mutex> sync(mtx);
			signaled = true;
		}
	
		cv.notify_one();

		std::cout << "Notify" << std::endl;
	};
	;

	std::thread t1(signal_thread);

	std::this_thread::sleep_for(1000ms);

	std::thread t2(wait_thread);

	std::this_thread::sleep_for(1000ms);

	t1.join();
	t2.join();
}

Notifed 버전에서 signal을 주는 스레드에서 bool signaled 변수 역시 동기화로 보호되는 것에 유의한다. 이렇게 한다면 Pred를 실행할 때 signaled 변수가 바뀔 일이 없다는 것을 보장한다.

생산자-소비자 패턴
유니크 락과 조건 변수를 활용하여 보이는 최종 생산자-소비자 코드 패턴은 다음과 같다.

std::mutex mtx = {};
std::condition_variable cv = {};

// 생산자 패턴
void Produce(int product)
{
    const auto queue = [&]()
    {
        std::lock_guard<std::mutex> sync(mtx);
        waiting_list.push(product);
    };
    
    queue();
    cv.notify_one();
}

// 소비자 패턴
void Consume()
{
    while (!Exit)
    {
    	std::unique_lock<std::mutex> lk(mtx);
    
    	cv.wait(lk, []() { return Exit || waiting_list.size() > 0; })
        
        if (Exit)
            return;
        
        // do something
    }
}



Helper 클래스 만들기


AutoResetEvent 클래스

구글을 검색하여 C#의 AutoResetEvent를 C++ 표준 라이브러리를 이용하여 구현한 것을 사용한다.
위 클래스는 시뮬레이션 중 작업을 큐하고 마칠 때 알람을 받기 위해 편의 기능으로 사용한다.
https://docs.microsoft.com/en-us/dotnet/api/system.threading.autoresetevent?view=net-6.0

 

AutoResetEvent Class (System.Threading)

Represents a thread synchronization event that, when signaled, resets automatically after releasing a single waiting thread. This class cannot be inherited.

docs.microsoft.com

더보기
#pragma once
#ifndef _AUTORESETEVENT_H_
#define _AUTORESETEVENT_H_
#include <mutex>
#include <condition_variable>
#include <thread>
#include <stdio.h>

class AutoResetEvent
{
public:
    explicit AutoResetEvent(bool initial = false)
        : Flag(initial)
    {
    }

    void Set()
    {
        std::lock_guard<std::mutex> _(SyncRoot);
        Flag = true;
        Signal.notify_one();
    }

    void Reset()
    {
        std::lock_guard<std::mutex> _(SyncRoot);
        Flag = false;
    }

    void WaitOne()
    {
        std::unique_lock<std::mutex> lk(SyncRoot);
        Signal.wait(lk, [this]() { return Flag; });
        Flag = false; // waiting resets the flag
    }

    bool WaitOne(int milliseconds)
    {
        std::unique_lock<std::mutex> lk(SyncRoot);
        bool signaled = Signal.wait_for(lk, std::chrono::milliseconds(milliseconds), [this]() { return Flag; });
        if (signaled == false)
            return false;

        Flag = false; // waiting resets the flag
        return true;
    }

private:
    AutoResetEvent(const AutoResetEvent&);
    AutoResetEvent& operator=(const AutoResetEvent&) = delete; // non-copyable
    bool Flag;
    std::mutex SyncRoot;
    std::condition_variable Signal;
};

#endif // !_AUTORESETEVENT_H_


ISingleton 싱글톤 패턴

싱글톤 패턴을 상속하여 구현하는 인터페이스이다.

더보기
#pragma once
#include <memory>
#ifndef  _Singleton_H_
#define _Singleton_H_

template<typename T>
class ISingleton
{
private:
	static std::unique_ptr<T> Instance;

protected:
	ISingleton() = default;

public:
	virtual ~ISingleton() = default;
	ISingleton(const ISingleton&) = delete;
	ISingleton& operator=(const ISingleton&) = delete;

	static T* Get()
	{
		if (Instance == nullptr)
		{
			static std::once_flag InitFlag = {};
			std::call_once(InitFlag, []() { Instance.reset(new T()); });
		}

		return Instance.get();
	}
};

template<typename T>
std::unique_ptr<T> ISingleton<T>::Instance = {};

#endif // ! _Singleton_H_


Random 클래스
성능 고려 없이 필요 함수만 구현한 C#의 Random을 모방한 클래스이다.

더보기
#pragma once
#ifndef  _RANDOM_H_
#define _RANDOM_H_

#include <iostream>
#include <cstdio>
#include <random>
#include <array>
#include <assert.h>
#include <memory.h>

class Random
{
private:
	class RandomDevice
	{
	private:
		std::random_device device;
		std::unique_ptr<std::seed_seq> seeds;
	public:
		RandomDevice()
		{
			auto seed_data = std::array<int, std::mt19937::state_size>{};
			std::generate(std::begin(seed_data), std::end(seed_data), std::ref(device));
			seeds = std::make_unique<std::seed_seq>(std::begin(seed_data), std::end(seed_data));
		}

		std::mt19937 Generate()
		{
			return std::mt19937(*seeds);
		}
	};
private:
	std::mt19937 engine;
	std::uniform_real_distribution<float> sample_dist{ 0.f, 1.f };

public:
	Random()
	{
		static RandomDevice device;
		engine = device.Generate();
	}

	int Next(int maxValue)
	{
		std::uniform_int_distribution<int> int_dist(0, maxValue - 1);
		return int_dist(engine);
	}

	int Next(int minValue, int maxValue)
	{
		assert(minValue < maxValue);
		std::uniform_int_distribution<int> int_dist(minValue, maxValue - 1);
		return int_dist(engine);
	}

	int NextBool()
	{
		return sample_dist(engine) > 0.5f;
	}

	float Sample()
	{
		return sample_dist(engine);
	}

	template<class Iter>
	void Shuffle(Iter first, Iter last)
	{
		std::shuffle(first, last, engine);
	}
};

#endif // ! _RANDOM_H_

 


은행 창구 시뮬레이션 프로그램 코드

더보기
#pragma once

#ifndef _CUSTOMERSERVICE_H_
#define _CUSTOMERSERVICE_H_

#include "define.h"
#include "ISingleton.h"
#include "AutoResetEvent.h"

using namespace std::chrono_literals;

class CustomerService : public ISingleton<CustomerService>
{
	friend class ISingleton<CustomerService>;
	using EventObject = std::shared_ptr<AutoResetEvent>;

private:
	class Request
	{
	private:
		enum class RequestStatus
		{
			Created,
			Running,
			Finished
		};
		RequestStatus status;
		EventObject eventObj;
		int			waitIdx;

	public:
		Request(int _waitIdx, const EventObject& _eventObj)
			:
			status(RequestStatus::Created),
			eventObj(_eventObj),
			waitIdx(_waitIdx)
		{}

		int GetIdx() const { return waitIdx; }

		void Start()
		{
			status = RequestStatus::Running;
			// eventObj->Reset();
		}

		void End()
		{
			status = RequestStatus::Finished;
			eventObj->Set();
		}

		void Wait()
		{
			eventObj->WaitOne();
		}
	};

	class WaitingLine
	{
	private:
		std::mutex SyncRoot;
		std::queue<int> Waitings;

	public:
		void Push(int waitIdx)
		{
			std::lock_guard<std::mutex> _(SyncRoot);
			Waitings.push(waitIdx);
		}

		bool TryPop(int& waitIdx)
		{
			std::lock_guard<std::mutex> _(SyncRoot);
			if (Waitings.empty())
				return false;

			waitIdx = Waitings.front();
			Waitings.pop();
			return true;
		}
	};

private:
	constexpr static int NumOfWorkers = 5;
	bool ExitFlag = false;
	int WaitIndex = 0;

	std::mutex SyncLog;
	std::mutex SyncTicket;
	std::mutex SyncTask;
	std::condition_variable SignalTask;
	std::vector<std::thread> Workers;

	std::queue<int> Waitings;
	std::queue<Request> Requests;
	std::vector<std::string> Logs;

protected:
	CustomerService()
	{
		for (int workerIdx = 0; workerIdx < NumOfWorkers; ++workerIdx)
		{
			Workers.emplace_back([this, workerIdx]() { Execute(workerIdx); });
		}

		std::this_thread::sleep_for(1s);
	}

	void Execute(int workerIdx)
	{
		Random rnd = {};
		while (!ExitFlag)
		{
			std::unique_lock<std::mutex> lk(SyncTask);
			SignalTask.wait(lk, [this] { return ExitFlag || Requests.size() > 0; });

			if (ExitFlag)
				return;

			// Where is your wait ticket?
			auto request = std::move(Requests.front());
			Requests.pop();
			lk.unlock();

			// How can I help you?
			request.Start();
			auto takeTime = rnd.Next(100, 200);
			std::this_thread::sleep_for(std::chrono::milliseconds(takeTime));

			// Bye
			request.End();
			Log(Format("[Served] TicketId : {0} From {1} Window\n", request.GetIdx(), workerIdx));
		}
	}

	int GenerateTicket()
	{
		std::lock_guard<std::mutex> sync(SyncTicket);
		int ticketed = WaitIndex++;
		Waitings.push(ticketed);
		return ticketed;
	}

	bool CheckTicket(const int& waitIdx)
	{
		std::lock_guard<std::mutex> sync(SyncTicket);
		if (Waitings.empty())
		{
			return false;
		}
		
		if (Waitings.front() != waitIdx)
		{
			return false;
		}

		Waitings.pop();
		return true;
	}

	void WaitForService(const int& waitIdx, const EventObject& eventObj)
	{
		const auto makeRequest = [&]()
		{
			std::lock_guard<std::mutex> _(SyncTask);
			return Requests.emplace(waitIdx, eventObj);
		};

		auto request = makeRequest();

		Log(Format("[Waiting] TicketId : {0}\n", waitIdx));
		SignalTask.notify_one();
		request.Wait();
	}

private:
	void Log(std::string log)
	{
		std::lock_guard<std::mutex> sync(SyncLog);
		Logs.push_back(std::move(log));
	}

	int GetTicket(int threadId)
	{
		int ticketed = GenerateTicket();
		Log(Format("[Ticketed] TicketId : {0} Thread : {1}\n", ticketed, threadId));

		return ticketed;
	}

	bool TryGetService(int waitIdx, const EventObject& eventObj)
	{
		if (CheckTicket(waitIdx) == false)
			return false;

		WaitForService(waitIdx, eventObj);

		return true;
	}

public:
	virtual ~CustomerService()
	{
		ExitFlag = true;
		SignalTask.notify_all();
		for (auto& worker : Workers)
		{
			worker.join();
		}
	}

	std::vector<std::string> Flush()
	{
		std::lock_guard<std::mutex> sync(SyncLog);
		return std::move(Logs);
	}

	void Simulate(int numCustomers, int numOfConcurrency)
	{
		int eachCustomers = numCustomers / numOfConcurrency;
		int remainCustomers = numCustomers - eachCustomers * (numOfConcurrency - 1);

		std::vector<std::thread> ticketingThreads;
		std::vector<std::thread> waitingThreads;
		std::vector<std::shared_ptr<WaitingLine>> waitingLines;
		for (int threadId = 0; threadId < numOfConcurrency; ++threadId)
		{
			auto waitingLine = std::make_shared<WaitingLine>();
			waitingLines.push_back(waitingLine);
			int numOfAssignedCustomers = threadId == numOfConcurrency - 1 ? remainCustomers : eachCustomers;
			ticketingThreads.emplace_back([this, numOfAssignedCustomers, waitingLine, threadId]()
			{
				Random rnd = {};
				for (int progress = 0; progress < numOfAssignedCustomers; ++progress)
				{
					waitingLine->Push(GetTicket(threadId));
					auto duration = rnd.Next(25, 75);
					std::this_thread::sleep_for(std::chrono::milliseconds(duration));
				}
			});

			waitingThreads.emplace_back([this, numOfAssignedCustomers, waitingLine]()
			{
				std::shared_ptr<AutoResetEvent> eventObj = std::make_shared<AutoResetEvent>();
				for (int progress = 0, waitId = -1; progress < numOfAssignedCustomers;)
				{
					if (waitId == -1 && waitingLine->TryPop(waitId) == false)
					{
						std::this_thread::sleep_for(50ms);
						continue;
					}

					if (TryGetService(waitId, eventObj) == false)
					{
						std::this_thread::sleep_for(50ms);
						continue;
					}

					++progress;
					waitId = -1;
				}
			});
		}

		for (auto& waitThread : waitingThreads)
		{
			waitThread.join();
		}

		for (auto& ticketingThread : ticketingThreads)
		{
			ticketingThread.join();
		}
	}
};

#endif // !_CUSTOMERSERVICE_H_


실행 결과

int main()
{
	CustomerService::Get()->Simulate(10, 3);

	auto results = CustomerService::Get()->Flush();

	for (auto& result : results)
	{
		std::cout << result;
	}

	return 0;
}


참고: 모던 C++ 챌린지(마리우스 반실라 지음)에 수록된 문제