C++/Boost

[Boost] thread

로파이 2022. 8. 21. 23:03

boost thread 사용법을 알아본다.

 

대부분의 기능은 C++11에 표준화되어 <thread> <atomic> <mutex> <condition_variable> 헤더에 포함되어 있다.

 

필요 헤더

#include <boost/thread.hpp>

 

thread 사용하기

static bool is_first_run() { return true; }
static void fill_file(char fill_char, std::size_t size, const char* filename);

static void p_example()
{
    if (is_first_run()) {
        boost::thread(boost::bind(&fill_file, 0, 8 * 1024 * 1024, "save_file.txt")).detach();
    }
}

실행하고자 하는 함수를 전달하여 스레드를 만들어 실행하도록 한다.

detach()는 프로그램 종료 시 같이 종료하는 스레드이다.

 

공유 자원 동기화

boost::mutex로 스레드 실행을 하나로 제한한다. C++11에 도입된 mutex와 같다.

static int i_val;
static boost::mutex i_mtx;
static void do_inc()
{
    for (int i = 0; i < 1000000; ++i)
    {
        boost::lock_guard<boost::mutex> lk(i_mtx);
        ++i_val;
    }
}

static void make_threads_inc()
{
    boost::thread t1(do_inc);
    boost::thread t2(do_inc);
    boost::thread t3(do_inc);

    t1.join();
    t2.join();
    t3.join();
}

 

작업 큐 구현

큐에 작업 단위의 개체를 집어넣고 하나씩 빼서 실행할 수있는 작업 큐를 구현할 수 있다.

boost::unique_lock, boost:ccondition_variable, boost::mutex를 사용한다.

class worker_queue
{
public:
    using task_type = boost::function<void()>;

private:
    std::deque<task_type> _tasks;
    boost::mutex _tasks_mutex;
    boost::condition_variable _cond;

public:
    void push_task(const task_type& task)
    {
        boost::unique_lock<boost::mutex> lk(_tasks_mutex);
        _tasks.push_back(task);
        lk.unlock();

        _cond.notify_one();
    }

    task_type try_pop_task() {
        task_type ret;
        boost::lock_guard<boost::mutex> lk(_tasks_mutex);
        if (!_tasks.empty()) {
            ret = _tasks.front();
            _tasks.pop_front();
        }

        return ret;
    }

    task_type pop_task() {
        boost::unique_lock<boost::mutex> lk(_tasks_mutex);
        while (_tasks.empty())
        {
            _cond.wait(lk);
        }

        task_type ret = _tasks.front();
        _tasks.pop_front();

        return ret;
    }
};

 

notify_one()을 호출할 때 미리 락을 해제해고 호출하는 것이 성능상 좋다. 락이 잠금되어 있는 상태에서도 호출 가능하지만 락을 획득하려는 스레드가 실패하고 블락되었다가 다시 잠금을 획득하기 때문이다.

void push_task(const task_type& task)
{
    boost::unique_lock<boost::mutex> lk(_tasks_mutex);
    _tasks.push_back(task);
    lk.unlock();

    _cond.notify_one();
}

 

다중 읽기 - 단일 쓰기 잠금

공유 자원이 많은 읽기를 요구하고 쓰기 정도는 빈번하지 않다면 boost::shared_mutex의 사용을 고려해본다.

struct user_info {
    std::string address;
    unsigned short age;
};

class users_online {
    using mutex_t = boost::shared_mutex;

    mutable mutex_t _users_mutex;
    std::unordered_map<std::string, user_info> _users;

public:
    bool is_online(const std::string& username) const {
        boost::shared_lock<mutex_t> lk(_users_mutex);
        return _users.find(username) != _users.end();
    }

    std::string get_address(const std::string& username) const {
        boost::shared_lock<mutex_t> lk(_users_mutex);
        return _users.at(username).address;
    }

    void set_online(const std::string& username, user_info&& data)
    {
        boost::lock_guard<mutex_t> lk(_users_mutex);
        _users.emplace(username, std::move(data));
    }
};

 

스레드 고유 변수

boost::thread_specific_ptr<T>

스레드 당 고유 커넥션을 유지하여 데이터 조회 작업을 할 수 있도록 한다.

스레드 당 하나의 변수를 초기화하고 사용하기 위해 스레드 안전한 방식으로 방법을 제공한다. 

class connection : boost::noncopyable {
public:
    void open();
    void send_result(int result);

    int _open_count;
    connection() : _open_count(0) {}
    boost::thread_specific_ptr<connection> connection_ptr;

    connection& get_connection() {
        connection* p = connection_ptr.get();
        if (!p)
        {
            connection_ptr.reset(new connection());
            p = connection_ptr.get();
            p->open();
        }

        return *p;
    }
};

 

C++11에서 thread_local 키워드와 unique_ptr을 사용한다면 대체하여 구현 가능하다.

connection& get_connection2()
{
    thread_local std::unique_ptr<connection> connection_ptr2;
    connection* p = connection_ptr2.get();
    if (!p)
    {
        connection_ptr.reset(new connection());
        p = connection_ptr.get();
        p->open();
    }
    return *p;
}

 

스레드 인터럽트

interrupt()

스레드가 처리하는 작업을 중지할 때 사용할 수 있는 방법이다.

C++ 표준에는 포함되지 않았다. 대신 bool 변수를 두어 구현하는 것으로 똑같이 가능하다.

static void do_work()
{
    try 
    {
        while (!is_stop) 
        {
            // throw if thread is interrupted
            boost::this_thread::interruption_point();

            // do work
        }
    }
    catch (const boost::thread_interrupted& e)
    {
        cout << "thread execution ends" << endl;
    }
}

static bool need_to_be_idle() { return true; }
static void thread_interrupt()
{
    boost::thread worker(&do_work);

    // wait
    // do something...

    if (need_to_be_idle())
    {
        worker.interrupt();
    }

    worker.join();
}

 

스레드 그룹

boost::thread_group

특정 함수를 실행하는 스레드를 그룹으로 묶어 생명 주기를 같이 관리할 수 있다.

static void do_something() {}
static void thread_groups()
{
    boost::thread_group threads;

    for (int i = 0; i < 10; ++i)
    {
        threads.create_thread(&do_something);
    }

    threads.join_all();
}

 

스레드 전역으로 단 한번 호출되어야하는 초기화 혹은 함수

boost::call_once, boost::once_flag

boost::once_flag 값을 확인한 후 BOOST_ONCE_INIT 값이라면 boost::call_once에 전달된 함수를 호출하게 된다.

#include <boost/thread/once.hpp>
struct postprocessor 
{
    using answer_t = std::vector<std::string>;

private:
    mutable boost::once_flag _default_flag;
    mutable answer_t		 _default;

public:
    postprocessor()
        :
        _default_flag(BOOST_ONCE_INIT),
        _default()
    {}

    static answer_t read_defaults() { return {}; }

    answer_t act(const std::string& in) const {
        answer_t ret;
        if (in.empty()) {
            boost::call_once(_default_flag, [this]() {
                this->_default = read_defaults();
                });
            return _default;
        }

        return ret;
    }
};

 

여러 뮤텍스 잠그기

ABBA 문제

A락과 B락을 획득하려는 함수가 있다하자.

Func 1: A락을 획득하고 B락을 획득하는 코드이다.

Func 2: B락을 획득하고 A락을 획득하는 코드이다. 

 

스레드 1과 스레드 2가 동시에 각각 Func 1과 Func 2를 호출하려고 한다.

스레드 1 : A 락을 획득한 상태에서 B락을 획득하려고 한다.

스레드 2 : B 락을 획득한 상태에서 A락을 획득하려고 한다.

이 상황에서 스레드 1은 B락을 획득할 수 없고 스레드 2도 마찬가지로 A락을 획득할 수 없다. 이는 무한 대기에 빠지게 되고 데드락 상황으로 이어진다.

 

user 개체가 소유하고 있는 items를 다른 user와 서로 swap하는 상황

class user 
{
private:
    using item_t = int;
    boost::mutex _items_mutex;
    std::vector<item_t> _items;

public:
    void exchange_items(user& u)
    {
        // might cause ABBA problem
        boost::lock_guard<boost::mutex> l0(_items_mutex);
        boost::lock_guard<boost::mutex> l1(u._items_mutex);
        _items.swap(u._items);
    }
};

자신의 mutex를 획득하고 상대방의 mutex를 획득하는 exchange_items는 ABBA 문제를 야기할 수 있다.

 

해결방법

모든 함수에서 락을 획득하는 순서를 획일화 한다.

boost::make_unique_locks는 주어진 락에 대하여 획일화된 순서로 락을 얻는 것을 보장한다.

#include <boost/thread/lock_factories.hpp>

void exchange_items_tuple_lock(user& u)
{
    using lock_t = boost::unique_lock<boost::mutex>;

    // std::tuple<lock_t, lock_t> l = boost::make_unique_locks(_items_mutex, u._items_mutex);
    auto l = boost::make_unique_locks(_items_mutex, u._items_mutex);

    _items.swap(u._items);
}

 

make_unique_locks에서 가변 템플릿을 사용하지 않는 boost::lock을 사용하여  처리하기

#include <boost/thread/locks.hpp>

void exchange_items_protable(user& u)
{
    using lock_t = boost::unique_lock<boost::mutex>;

    lock_t l0(_items_mutex, boost::defer_lock);
    lock_t l1(u._items_mutex, boost::defer_lock);
    boost::lock(l0, l1);

    _items.swap(u._items);
}

 

C++ 17에서는 다음과 같이 처리할 수 있다.

void exchange_items_cxx17(user& u)
{
    std::scoped_lock l(u._items_mutex, _items_mutex);
    _items.swap(u._items);
}