Computer Science 기본 지식/운영체제

[C++ Thread] 약속과 미래 객체, std::promise / std::future

로파이 2021. 4. 30. 01:14

C++11에 도입된 std::promise와 std::future 객체를 알아본다.

 

약속 객체 (std::promise)

약속 객체는 주로 비동기적으로 실행되는 다른 쓰레드로 부터 계산된 값이나 예외를 저장할 수 있는 공간을 가지고 있다. 약속 객체가 쓰레드의 완료로 저장한 값을 전달하기 위해 미래 객체가 필요하고 미래 객체는 약속 객체에 의해 생성된다. 또한 약속 객체는 결과 전달을 한번만 하도록 설계되었다.

 

템플릿 정의

template<class R> class promise; // 기본 템플릿
template<class R> class promise<R&>; // 쓰레드간 객체를 통신하기위해 사용한다.
template<>        class promise<void>; // 상태없는 이벤트를 통신하기위해 사용한다.

 

- 템플릿 인수에 전달받을 값의 타입을 명시하여 선언한다.

std::promise<int> my_promise; // 이 약속 객체는 int를 전달받을 것이다.

 

- 자신이 전달할 미래 객체를 직접 생성한다.

future<int> my_future = my_promise.get_future();

 

약속 객체는 공유 상태가 존재하는데, 아직 계산되지 않은 혹은 계산된, 예외가 발생한 등의 상태 정보를 갖는다. 약속 객체는 이 공유 상태로 다음과 같은 일을 수행할 수 있다.

  • make ready: promise 객체는 공유 상태에 있는 결과나 예외를 가지고 있다. 이 때, 상태를 준비 상태로 설정하고 공유되 상태와 연관된 미래 객체를 블로킹 상태에서 깨워준다.
  • release: 약속 객체는 공유 상태에게 자신에 대한 참조를 하나 포기하게 한다. (스마트 포인터의 레퍼 카운트처럼) 마지막 참조라면 공유 상태를 파괴한다. std::aync에 의해 생성된 공유 상태가 아니라면, release는 차단되지 않는다.
  • abandon: 에러 코드가 std::future_errc::broken_promise인 std::future_error의 예외를 저장하고 공유 상태를 ready로 만든 다음 release한다.

약속 객체는 약속-미래 객체로 이루어진 통신 채널의 전달에서 마지막 엔드(end) 시스템이다. 즉, 공유 상태에 저장하는 연산은 future의 get과 같이 전달 값을 기다리는 함수에 "memory_order" 수정 순서의 동기화를 제공한다.

 

하지만 병행적으로 공유 상태에 접근하는 것은 충돌을 일으킬 수 있는데, 예를 들어 공유 미래 객체 (약속 객체는 전달 대상을 여러 둘 수 있다.)의 get 호출이 그 예이다. 이 때는, get의 결과가 read-only이거나 다른 동기화 방법을 제공해야한다.

 

- 예제

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

using namespace std;
void calculate(vector<int>::iterator first,
				vector<int>::iterator last,
				promise<int> result)
{
	int sum = accumulate(first, last, 0);
	result.set_value(sum);
}

void do_work(promise<void> barrier)
{
	this_thread::sleep_for(std::chrono::seconds(1));
	barrier.set_value();
}

int main()
{
	// promise<int>를 사용하여 쓰레드 간에 결과를 전송할 수 있다.
	vector<int> numbers = { 1,2,3,4,5,6,7,8,9,10 };

	// 상태 공유를 위한 프로미스 (약속) 객체
	promise<int> result;

	// 약속 객체를 전달받을 있는 퓨쳐 (미래) 객체
	future<int> ft = result.get_future();

	// 계산을 실행하는 쓰레드를 열고 계산된 결과를 받을 수 있도록 약속 객체를 전달한다.
	thread work_thread(calculate, numbers.begin(), numbers.end(),
						std::move(result));

	// 미래 객체는 get()을 호출하게 되면 약속된 값이 전달될 때까지 기다리게 된다.
	std::cout << "result =" << ft.get() << "\n";
	work_thread.join(); // 쓰레드가 종료될 때까지 기다린다.

	// 쓰레드간 상태를 신호할 수 있는 방법
	promise<void> barrier;
	future<void> barrier_future = barrier.get_future();
	thread new_work_thread(do_work, std::move(barrier));

	// wait을 호출하면 약속 객체가 ready 상태가 될 때까지 기다린다.
	barrier_future.wait();
	std::cout << "wake up" << std::endl;
	new_work_thread.join();
	
	return 0;
}

쓰레드에서 1~10의 더한 결과를 result라는 약속 객체를 통해 전달한다. result와 연관된 미래 객체 ft의 get 호출 시 결과를 전달받을 때까지 블로킹 상태에 있게된다.

void 타입의 약속 객체는 쓰레드의 시그널을 전달하는 기능을 수행한다.

 

미래 객체(std::future)
약속 객체와 연관되는 미래 객체는 약속 객체에 저장된 비동기 연산의 결과를 접근할 수 있는 인터페이스를 가진다.

 

std::async와 std:packaged_task를 통해 시동된 비동기 연산이나 std::promise에 의해 미래 객체를 비동기 작업을 만든 주체에게 제공해준다.

 

비동기 작업을 만든 주체는 미래 객체를 통해 값을 질의하거나 기다리고 결과를 추출하는 등의 연산을 이용한다. 이러한 연산들은 비동기 작업이 아직 끝나지 않아 값을 전달할 수 없을 때 블락될 수 있다.

 

비동기 작업은 완료가 되었을 때 공유 상태를 변경하는 연산을 통해 주체의 미래 객체에 알릴 수 있다.

 

std::future는 std::shared_future와 다르게 다른 비동기 작업의 return objects와 공유하지 않는 공유 상태를 참조한다.

 

- 약속 객체가 예외를 저장하고 미래 객체가 전달받아 예외 처리를 수행할 수 있다. 

#include <thread>
#include <iostream>
#include <future>

int main()
{
    std::promise<int> p;
    std::future<int> f = p.get_future();

    std::thread t([&p] {
        try {
            // code that may throw
            throw std::runtime_error("Example");
        }
        catch (...) {
            try {
                // store anything thrown in the promise
                p.set_exception(std::current_exception());
            }
            catch (...) {} // set_exception() may throw too
        }
    });

    try {
        std::cout << f.get();
    }
    catch (const std::exception& e) {
        std::cout << "Exception from the thread: " << e.what() << '\n';
    }
    t.join();
}

 

공유되는 미래 객체 (std::shared_future)

std::shared_future 클래스는 비동기 작업의 결과를 std::future와 마찬가지로 접근할 수 있다. std::future와 다른 점은 다수의 쓰레드가 같은 공유 상태를 기다리는 것을 허용한다. std::future는 복사가 불가능하고 이동만 가능한 객체이기 때문에 특정 비동기 결과를 한 인스턴스만 참조 가능하다. 이와 달리 std::shared_future는 복사 가능하고 shared_future 객체의 복사본들, 다수의 shared_future는 같은 공유 상태를 접근할 수 있다.

 

- 두 비동기 쓰레드를 대기시켰다가 주 쓰레드가 신호를 주기 시작하는 순간 실행하는 예제

#include <iostream>
#include <future>
#include <chrono>

using namespace std;
int main()
{
	promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
	shared_future<void> ready_future = ready_promise.get_future();

	chrono::time_point<chrono::high_resolution_clock> start;

	// ready_future는 shared_future이기 떄문에 다수의 쓰레드에서 wait을 호출 가능하다.
	auto func1 = [&, ready_future]() -> chrono::duration<double, milli>
	{
		t1_ready_promise.set_value(); // func1에 연관된 약속 객체는 ready가 된다.
		ready_future.wait();  // 신호가 올 때까지 쓰레드를 블로킹한다.
		return chrono::high_resolution_clock::now() - start;
	};

	auto func2 = [&, ready_future]() -> chrono::duration<double, milli>
	{
		t2_ready_promise.set_value(); // func2에 연관된 약속 객체는 ready가 된다.
		ready_future.wait(); // 신호가 올 때까지 쓰레드를 블로킹한다.
		return chrono::high_resolution_clock::now() - start;
	};

	// 준비 신호를 받기 위한 미래 객체를 생성한다.
	auto future1 = t1_ready_promise.get_future();
	auto future2 = t2_ready_promise.get_future();

	// std::async는 함수를 비동기적으로 실행하고 함수 결과를 전달받을 수 있는 
	// std::future 객체를 반환한다.
	auto result1 = std::async(std::launch::async, func1);
	auto result2 = std::async(std::launch::async, func2);

	// 여기서 두 쓰레드가 준비될 때까지 기다린다.
	future1.wait();
	future2.wait();

	// the threads are ready, start the clock
	start = std::chrono::high_resolution_clock::now();

	// 시작!
	ready_promise.set_value();

	// func1, func2의 실행 결과를 전달받게 된다.
	std::cout << "Thread 1 received the signal "
		<< result1.get().count() << " ms after start\n"
		<< "Thread 2 received the signal "
		<< result2.get().count() << " ms after start\n";

	return 0;
}

1. 두 쓰레드를 실행하고 대기 완료가 되었는지 확인하기 위해 두 개의 약속 객체와 미래 객체를 생성한다.

 

2. 주 쓰레드에서 std::async 함수 호출을 하여 비동기 함수를 시동한다. std::async 함수는 시동 옵션과 함께 호출하면 해당 결과를 접근할 수 있는 std::future 객체를 반환한다.

 

3. 두 스레드의 비동기 실행에서 자신의 미래 객체의 set_value()를 통해 현재 쓰레드가 준비 완료됨을 주 쓰레드에게 알린다. 이 후 하나의 약속 개체를 참조하는 shared_future의 wait()을 통해 두 쓰레드가 같이 대기할 수 있게된다.

 

4. 주 쓰레드는 shared_future에 연관된 약속 객체를 통해 시작 신호를 준다.

 

5. 두 쓰레드가 동시에 블로킹에서 나와 함수를 반환하고 결과를 전달한다.

 

호출가능한 함수를 담아두는 std::packaged_task

std::packaged_task는 아무 호출가능한 객체를 감싸는(wrap) 클래스이며 해당 함수를 비동기적으로 시동할 것을 기대한다. 약속 객체가 내부에 있으며 get_future를 통해 연결된 미래 객체를 얻을 수 있다.

 

호출 가능한 객체 (람다, function, 함수 포인터)등을 생성자 인수에 전달하여 함수를 직접 호출할 수 있고 자신이 만들어낸 미래 객체에 값이나 예외를 전달한다.

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>

// 어떤 유일한 함수
int f(int x, int y) { return std::pow(x, y); }

void task_lambda()
{
	std::packaged_task<int(int, int)> task(
		[](int a, int b)
	{
		return std::pow(a, b);
	});
	std::future<int> result = task.get_future();

	// 함수를 직접 호출 할 수 있다.
	task(2, 9);

	std::cout << "task_lambda:\t" << result.get() << "\n";
}

void task_bind()
{
	// bind된 함수
	std::packaged_task<int()> task(std::bind(f, 2, 11));
	std::future<int> result = task.get_future();

	task();

	std::cout << "task_bind:\t" << result.get() << std::endl;
}

void task_thread()
{
	// 일반 함수
	std::packaged_task<int(int, int)> task(f);
	std::future<int> result = task.get_future();

	std::thread task_td(std::move(task), 2, 10);
	task_td.join();

	std::cout << "task_thread:\t" << result.get() << "\n";
}

int main()
{
	task_lambda();
	task_bind();
	task_thread();
}

 

비동기 실행을 촉발하는 std::async

// C++ 11
template<class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
    async( Function&& f, Args&&... args );

템플릿 인수로 함수 타입을 받는 것이 특징이다. 즉 함수와 함수호출을 위한 인자를 직접 생성할 때 받아 바로 함수를 실행할 수 있음을 시시한다. 또한 미래 객체를 반환함으로써 실행 결과를 잡아둘 수 있다.

// c++ 17
template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
    async( std::launch policy, Function&& f, Args&&... args )

1) 특별한 실행 정책을 정할 수 있는 데 그 인자가 바로 std::launch라는 enum 클래스이다.

std::launch::async 혹은 std::launch::deferred를 지정할 수 있으며, 함수 f가 다른 쓰레드에서 실행되거나 미래 객체를 통해 결과를 질의 받을 때 동기적으로 수행할수 도 있다는 옵션을 임의로 결정하도록 두는 것이다.

 

2) launch policy가 둘 중 하나의 경우라면,

async flag(비동기)가 세워진다면, 해당 함수는 새로운 쓰레드에서 비동기적으로 실행된다. 함수 종료 시 공유 상태에 결과를 저장하며 호출자가 접근할 수 있는 future 객체에 의해 접근될 수 있다.

 

deferred flag(지연)가 세워진다면, async가 함수와 함수인자들을 가지고 쓰레드를 생성하는 방식이지만 새로운 실행흐름을 발생시키지는 않는다. "lazy evaluation"이 수행되는데, 시간이 설정되지 않은 wait 함수를 미래 객체에 대해 호출할 때 함수와 인자들의 복사본들을 가지고 실행하기 시작한다. 마찬가지로 결과값과 예외가 공유 상태에 저장되며 미래 객체에 후에 접근하는 것은 결과를 바로 반환한다.

 

비동기 플래그와 지연 플래그가 둘 다 세워져 있다면, 비동기 실행이 실행되거나 lazy evaluation이 실현될 수 있는데 구현마다 다르게 나타나므로 임의적이다.

 

비동기와 지연 플래그 둘 다 아니라면, 미정의 행동이다.

 

어떤 플래그이던 결과에 의해 공유 상태를 ready로 만드는 것과 결과를 미래 객체에 전달하는 것은 시간 순서대로 일어나며 동기화가 된다.

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>

std::mutex m;

struct X
{
	void foo(int i, const std::string& str)
	{
		std::lock_guard<std::mutex> lk(m);
		std::cout << str << ' ' << i << '\n';
	}
	void bar(const std::string& str)
	{
		std::lock_guard<std::mutex> lk(m);
		std::cout << str << '\n';
	}
	int operator()(int i)
	{
		std::lock_guard<std::mutex> lk(m);
		std::cout << i << '\n';
		return i + 10;
	}
};

template<typename RandomIt>
long long int parallel_sum(RandomIt beg, RandomIt end)
{
	auto len = end - beg;
	if (len < 1000)
	{
		return std::accumulate(beg, end, 0);
	}

	// 비동기 쓰레드를 생성하여 또 다른 부분 합을 구한다.
	RandomIt mid = beg + len / 2;
	auto handle = std::async(std::launch::async,
		parallel_sum<RandomIt>, mid, end);

	long long int sum = parallel_sum(beg, mid);
	// 여기서 결과가 계산될 때까지 기다리게 된다.
	return sum + handle.get();
}

int main()
{
	std::vector<int> v(10000, 1);
	std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';

	X x;

	auto a1 = std::async(&X::foo, &x, 42, "Hello");

	auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");

	auto a3 = std::async(std::launch::async, X(), 43);

	// 지연 플래그로 설정된 쓰레드는 여기서 실행을 시작한다.
	a2.wait();
	
	// 비동기 플래그로 비동기 실행된 쓰레드의 실행 결과를 기다린다.
	std::cout << a3.get() << '\n';

	return 0;
}