[Boost] thread
boost thread 사용법을 알아본다.
대부분의 기능은 C++11에 표준화되어 <thread> <atomic> <mutex> <condition_variable> 헤더에 포함되어 있다.
필요 헤더
#include <boost/thread.hpp>
thread 사용하기
static bool is_first_run() { return true; }
static void fill_file(char fill_char, std::size_t size, const char* filename);
static void p_example()
{
if (is_first_run()) {
boost::thread(boost::bind(&fill_file, 0, 8 * 1024 * 1024, "save_file.txt")).detach();
}
}
실행하고자 하는 함수를 전달하여 스레드를 만들어 실행하도록 한다.
detach()는 프로그램 종료 시 같이 종료하는 스레드이다.
공유 자원 동기화
boost::mutex로 스레드 실행을 하나로 제한한다. C++11에 도입된 mutex와 같다.
static int i_val;
static boost::mutex i_mtx;
static void do_inc()
{
for (int i = 0; i < 1000000; ++i)
{
boost::lock_guard<boost::mutex> lk(i_mtx);
++i_val;
}
}
static void make_threads_inc()
{
boost::thread t1(do_inc);
boost::thread t2(do_inc);
boost::thread t3(do_inc);
t1.join();
t2.join();
t3.join();
}
작업 큐 구현
큐에 작업 단위의 개체를 집어넣고 하나씩 빼서 실행할 수있는 작업 큐를 구현할 수 있다.
boost::unique_lock, boost:ccondition_variable, boost::mutex를 사용한다.
class worker_queue
{
public:
using task_type = boost::function<void()>;
private:
std::deque<task_type> _tasks;
boost::mutex _tasks_mutex;
boost::condition_variable _cond;
public:
void push_task(const task_type& task)
{
boost::unique_lock<boost::mutex> lk(_tasks_mutex);
_tasks.push_back(task);
lk.unlock();
_cond.notify_one();
}
task_type try_pop_task() {
task_type ret;
boost::lock_guard<boost::mutex> lk(_tasks_mutex);
if (!_tasks.empty()) {
ret = _tasks.front();
_tasks.pop_front();
}
return ret;
}
task_type pop_task() {
boost::unique_lock<boost::mutex> lk(_tasks_mutex);
while (_tasks.empty())
{
_cond.wait(lk);
}
task_type ret = _tasks.front();
_tasks.pop_front();
return ret;
}
};
notify_one()을 호출할 때 미리 락을 해제해고 호출하는 것이 성능상 좋다. 락이 잠금되어 있는 상태에서도 호출 가능하지만 락을 획득하려는 스레드가 실패하고 블락되었다가 다시 잠금을 획득하기 때문이다.
void push_task(const task_type& task)
{
boost::unique_lock<boost::mutex> lk(_tasks_mutex);
_tasks.push_back(task);
lk.unlock();
_cond.notify_one();
}
다중 읽기 - 단일 쓰기 잠금
공유 자원이 많은 읽기를 요구하고 쓰기 정도는 빈번하지 않다면 boost::shared_mutex의 사용을 고려해본다.
struct user_info {
std::string address;
unsigned short age;
};
class users_online {
using mutex_t = boost::shared_mutex;
mutable mutex_t _users_mutex;
std::unordered_map<std::string, user_info> _users;
public:
bool is_online(const std::string& username) const {
boost::shared_lock<mutex_t> lk(_users_mutex);
return _users.find(username) != _users.end();
}
std::string get_address(const std::string& username) const {
boost::shared_lock<mutex_t> lk(_users_mutex);
return _users.at(username).address;
}
void set_online(const std::string& username, user_info&& data)
{
boost::lock_guard<mutex_t> lk(_users_mutex);
_users.emplace(username, std::move(data));
}
};
스레드 고유 변수
boost::thread_specific_ptr<T>
스레드 당 고유 커넥션을 유지하여 데이터 조회 작업을 할 수 있도록 한다.
스레드 당 하나의 변수를 초기화하고 사용하기 위해 스레드 안전한 방식으로 방법을 제공한다.
class connection : boost::noncopyable {
public:
void open();
void send_result(int result);
int _open_count;
connection() : _open_count(0) {}
boost::thread_specific_ptr<connection> connection_ptr;
connection& get_connection() {
connection* p = connection_ptr.get();
if (!p)
{
connection_ptr.reset(new connection());
p = connection_ptr.get();
p->open();
}
return *p;
}
};
C++11에서 thread_local 키워드와 unique_ptr을 사용한다면 대체하여 구현 가능하다.
connection& get_connection2()
{
thread_local std::unique_ptr<connection> connection_ptr2;
connection* p = connection_ptr2.get();
if (!p)
{
connection_ptr.reset(new connection());
p = connection_ptr.get();
p->open();
}
return *p;
}
스레드 인터럽트
interrupt()
스레드가 처리하는 작업을 중지할 때 사용할 수 있는 방법이다.
C++ 표준에는 포함되지 않았다. 대신 bool 변수를 두어 구현하는 것으로 똑같이 가능하다.
static void do_work()
{
try
{
while (!is_stop)
{
// throw if thread is interrupted
boost::this_thread::interruption_point();
// do work
}
}
catch (const boost::thread_interrupted& e)
{
cout << "thread execution ends" << endl;
}
}
static bool need_to_be_idle() { return true; }
static void thread_interrupt()
{
boost::thread worker(&do_work);
// wait
// do something...
if (need_to_be_idle())
{
worker.interrupt();
}
worker.join();
}
스레드 그룹
boost::thread_group
특정 함수를 실행하는 스레드를 그룹으로 묶어 생명 주기를 같이 관리할 수 있다.
static void do_something() {}
static void thread_groups()
{
boost::thread_group threads;
for (int i = 0; i < 10; ++i)
{
threads.create_thread(&do_something);
}
threads.join_all();
}
스레드 전역으로 단 한번 호출되어야하는 초기화 혹은 함수
boost::call_once, boost::once_flag
boost::once_flag 값을 확인한 후 BOOST_ONCE_INIT 값이라면 boost::call_once에 전달된 함수를 호출하게 된다.
#include <boost/thread/once.hpp>
struct postprocessor
{
using answer_t = std::vector<std::string>;
private:
mutable boost::once_flag _default_flag;
mutable answer_t _default;
public:
postprocessor()
:
_default_flag(BOOST_ONCE_INIT),
_default()
{}
static answer_t read_defaults() { return {}; }
answer_t act(const std::string& in) const {
answer_t ret;
if (in.empty()) {
boost::call_once(_default_flag, [this]() {
this->_default = read_defaults();
});
return _default;
}
return ret;
}
};
여러 뮤텍스 잠그기
ABBA 문제
A락과 B락을 획득하려는 함수가 있다하자.
Func 1: A락을 획득하고 B락을 획득하는 코드이다.
Func 2: B락을 획득하고 A락을 획득하는 코드이다.
스레드 1과 스레드 2가 동시에 각각 Func 1과 Func 2를 호출하려고 한다.
스레드 1 : A 락을 획득한 상태에서 B락을 획득하려고 한다.
스레드 2 : B 락을 획득한 상태에서 A락을 획득하려고 한다.
이 상황에서 스레드 1은 B락을 획득할 수 없고 스레드 2도 마찬가지로 A락을 획득할 수 없다. 이는 무한 대기에 빠지게 되고 데드락 상황으로 이어진다.
user 개체가 소유하고 있는 items를 다른 user와 서로 swap하는 상황
class user
{
private:
using item_t = int;
boost::mutex _items_mutex;
std::vector<item_t> _items;
public:
void exchange_items(user& u)
{
// might cause ABBA problem
boost::lock_guard<boost::mutex> l0(_items_mutex);
boost::lock_guard<boost::mutex> l1(u._items_mutex);
_items.swap(u._items);
}
};
자신의 mutex를 획득하고 상대방의 mutex를 획득하는 exchange_items는 ABBA 문제를 야기할 수 있다.
해결방법
모든 함수에서 락을 획득하는 순서를 획일화 한다.
boost::make_unique_locks는 주어진 락에 대하여 획일화된 순서로 락을 얻는 것을 보장한다.
#include <boost/thread/lock_factories.hpp>
void exchange_items_tuple_lock(user& u)
{
using lock_t = boost::unique_lock<boost::mutex>;
// std::tuple<lock_t, lock_t> l = boost::make_unique_locks(_items_mutex, u._items_mutex);
auto l = boost::make_unique_locks(_items_mutex, u._items_mutex);
_items.swap(u._items);
}
make_unique_locks에서 가변 템플릿을 사용하지 않는 boost::lock을 사용하여 처리하기
#include <boost/thread/locks.hpp>
void exchange_items_protable(user& u)
{
using lock_t = boost::unique_lock<boost::mutex>;
lock_t l0(_items_mutex, boost::defer_lock);
lock_t l1(u._items_mutex, boost::defer_lock);
boost::lock(l0, l1);
_items.swap(u._items);
}
C++ 17에서는 다음과 같이 처리할 수 있다.
void exchange_items_cxx17(user& u)
{
std::scoped_lock l(u._items_mutex, _items_mutex);
_items.swap(u._items);
}