Advanced C++

[Modern C++] (7-1) 동시성 API

로파이 2021. 5. 1. 01:05

C++11를 기점으로 도입된 동시성(Concurrency)을 언어와 표준 라이브러리에 도입하였다. 표준 라이브러리의 동시성 구성요소들 (과제, 미래, 스레드, 뮤 텍스, 조건 변수, 원자적 객체)에 대해 알아보도록 한다.

 

1. 스레드(Thread) 기반 프로그래밍보다 과제(Task) 기반 프로그래밍을 선호하라

 

스레드 기반 프로그래밍 이란

- 새로운 스레드를 실행하는 함수를 할당하여 생성하는 방식

int doAsyncWork();

std::thread t(doAsyncWork);

과제 기반 프로그래밍

auto fut = std::async(doAsyncWork); // fut은 future를 뜻한다.

std::async에 전달된 doAsyncWork는 과제로 취급한다.

 

과제 기반 프로그래밍을 선호하는 이유

스레드 기반 호출에서는 함수의 반환값을 전달받을 방법이 없다. 하지만 과제 기반 프로그래밍에서는 std::async가 돌려주는 미래 객체(fut)의 get 멤버 함수를 통해 결과를 전달받을 수 있다.

또한 스레드 기반 프로그래밍에서 doAsyncWork가 예외를 던지면 프로그램이 죽으나 과제 기반에서는 예외 또한 값처럼 미래 객체에 전달된다.

 

'스레드' 용어

- 하드웨어 스레드 : 실제 계산을 수행하는 스레드를 의미한다. 현대 컴퓨터 아키텍처는 CPU 코어당 하나 이상의 하드웨어 스레드를 제공한다.

- 소프트웨어 스레드 : 운영체제가 하드웨어 스레드들에서 실행되는 모든 프로세서와 일정을 관리하는 데 사용하는 소프트웨어 스레드, OS 스레드나 시스템 스레드라고도 한다. 대체로, 하드웨어 스레드보다 많은 소프트웨어 스레드를 생성할 수 있다. 소프트웨어 스레드가 차단되어도 차단되지 않은 스레드를 실행함으로 산출량을 기대할 수 있다.

- C++ 표준 라이브러리의 std::thread : 하나의 C++ 프로세스 안에서 std::thread 객체는 바탕 소프트웨어 스레드에 대한 핸들로 작용한다. C++ 스레드 객체는 기본 생성자로 생성하거나 다른 객체로 이동되었을 경우 바탕 핸들이 null값을 가질 수 있다.

 

소프트웨어 스레드는 제한된 자원으로 시스템이 제공할 수 있는 것보다 많은 소프트웨어 스레드를 생성하려고 하면 std::system_error 예외가 발생한다.

 

소프트웨어 스레드가 부족한 상황을 피하는 방법

1) 현재 스레드에서 실행한다.

현재 스레드가 GUI처럼 사용자 반응속도가 중요한 실행 흐름이라면 과부하가 걸려 반응성에 문제가 될 수 있다.

 

2) 다른 std::thread 객체가 끝나기를 기다렸다가 생성하도록 한다.

과다구독(oversubscription) 문제 : 실행 준비가 된(차단되지 않은) 소프트웨어 스레드가 하드웨어 스레드보다 많은 상황을 가리킨다. 스레드 스케줄러는 총 하드웨어 실행 시간을 여러 조각으로 나누어 소프트웨어 스레드들에게 배분하게 된다. 과다 구독 상황에서는 한 소프트웨어 스레드에 배분된 시간이 짧아지고 문맥 교환으로 인한 오버헤드가 증가하게 된다.

그러한 문맥교환으로 인해 CPU의 캐시는 소프트웨어 스레드에 대해 Cold 상태이며 또한 '기존' 스레드가 사용하던 레지스터들로 인해 스레드가 오염된다.

 

하드웨어와 소프트웨어 스레드 개수의 이상적인 비율은 실행 가능한 소프트웨어 스레드 개수에 의존적이며 문맥 전환 비율, CPU 캐시를 얼마나 효율적으로 사용하는지에 따라 다르다.

 

std::async

적절한 소프트웨어 스레드의 갯수를 신경 쓰고 싶지 않다면 std::async를 쓰는 것이 적절하다. std::async는 새 소프트웨어 스레드를 생성하지 않을 수도 있는데, 대신 연관된 미래 객체가 get이나 wait를 호출함으로써 실행 결과가 필요한 시점에서 해당 스레드를 실행 해달 로고 스케줄러에게 요청할 수 있다.

 

std::async는 호출할 때 launch policy라는 enum class 타입 인수를 전달하게된다. 만약, 해당 실행을 새로운 스레드를 생성해서 하고 싶다면 std::launch::async를 넘겨주는 것이 바람직하다.

 

과제 기반 프로그래밍이 스레드 기반 보다 스레드를 일일이 관리해야 하는 수고로움이 없으나 스레드를 직접 다루는 게 적합한 경우도 있다.

 

1) 바탕 스레드 적용 라이브러리의 API에 접근해야 하는 경우

 _Thr._Hnd =
            reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));

실제 스레드 객체의 핸들 타입을 유추할 수 있는 대목인데, thread의 생성으로 실행 흐름이 invoke(발생)하면, 그 함수 실행 중 위와 같은 라인이 존재한다. 여기서 _beginthreadex는 Windows 운영체제가 제공하는 스레드를 생성하는 시스템 함수이다. 시스템 함수는 C++ 라이브러리의 std::thread보다 저수준의 기능을 제공하는데 (우선순위 스케줄링 등) 이를 이용하기 위해 std::thread의 native_handle를 호출해서 시스템 스레드 핸들을 얻을 수 있다.

 

2) 응용 프로그램의 스레드 사용량을 최적화 할 수 있어야 하는 경우

하드웨어 속성이 정해진 특별한 컴퓨터, 서버를 위한 소프트웨어를 개발하는 경우

 

3) C++ 동시성 API가 제공하는 것 이상의 스레드 적용 기술을 구현해야 하는 경우

C++ 라이브러리를 이용하여 구현한 스레드 풀 기능을 다른 플랫폼에서는 다르게 구현해야 할 수 있다. 

 

2. 비동기성이 필수일 때에는 std::launch::async를 지정하라

std::async는 시동 방침에 따라 실행 흐름을 발생시키는 방식이 다른데,

  • std::launch::async 시동 방침을 지정하면 f는 반드시 비동기적으로 다른 스레드에서 실행된다.
  • std::launch::deferred 시동 방침을 지정하면 f는 연관된 미래 객체가 get이나 wait이 호출될 때에만 실행될 수 있다. 이는 async의 호출 시점에서 '지연된' 실행이라고 하기도 한다.

기본 시동 방침 = 비동기와 지연 옵션 모두

auto fut = std::async(f);

auto fut2 = std::async(std::launch::async | std::launch::deferred, f);

함수 f에 대해 기본 시동 방침은 비동기와 지연 옵션에 OR값으로 어느 옵션으로 실행될지는 임의적으로 결정된다. 덕분에 스레드를 관리하는 것이 더 유연해졌으며 동시적 프로그래밍을 편리하게 만들어준다.

 

기본 시동 방침에 따른 기대 결과

auto fut = std::async(f);

함수 f가 스레드 A에서 기본 시동 방침으로 실행되면 다음의 영향이 있다.

  • f는 지연 실행될 수도 있으므로 f와 t가 동시에 실행될지 예측하는 것이 불가능하다.
  • f가 fut에 대해 get이나 wait를 호출하는 스레드와 다른 스레드에서 실행되는 지 또한 알 수 없다.
  • 프로그램의 모든 가능한 경로에서 fut에 대한 get이나 wait 호출이 일어난다는 보장이 없을 수 도 있으므로 f가 반드시 실행될 것 인지 예측하는 것이 불가능할 수 도 있다.

기본 시동 방침의 스케줄 유연성은 thread_local과 잘 맞지 않을 수 있는데 어떤 스레드가 스레드 로컬 변수를 사용할지 모르기 때문이다.

 

지연된 과제에 대해 wait_for나 wait_until를 호출하여 기다리면 해당 스레드의 실행이 시동되는 것이 아니라 std::future_status::deferred를 즉시 반환하며 while 루프가 다른 조건에 의해서 빠져나올 수 있게 되어 있다면 루프는 절대 종료되지 않는다.

 

- 예제

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int main()
{
    std::future<int> future = std::async(std::launch::deferred, []() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        return 8;
    });

    std::cout << "waiting...\n";
    std::future_status status;
    do {
        status = future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) {
            std::cout << "deferred\n";
        }
        else if (status == std::future_status::timeout) {
            std::cout << "timeout\n";
        }
        else if (status == std::future_status::ready) {
            std::cout << "ready!\n";
        }
    } while (status != std::future_status::ready);

    std::cout << "result is " << future.get() << '\n';
}

참고 : en.cppreference.com/w/cpp/thread/future/wait_for

실행 결과

 

따라서 기본 시동방침을 써도 되는 상황이라고 판단되는 경우 다음 조건이 모두 성립할 때 가능하다.

  • 과제가 get이나 wait을 호출하는 스레드와 반드시 동시적으로 실행되어야 하는 것은 아니다.
  • 여러 스레드 중 어떤 스레드의 thread_local 변수들을 읽고 쓰는지가 중요하지 않다.
  • std::async가 돌려준 미래 객체가 get이나 wait이 반드시 호출된다는 보장이 있거나 실행이 되지 않아도 된다.
  • 과제가 지연된 상태일 수 도 있다는 점이 wait_for나 wait_until을 사용하는 코드에 반영되어 있다.

그렇지 않다면 시동 방침을 std::lauch::async로 지정하여 실행하는 것을 고려해야 한다.

auto fut = std::async(std::laynch::async, f);

// 자동적으로 비동기 실행을 시동한다.
template<typename Function, typename... Ts>
inline std::future<typename std::result_of<Function(Ts...)>::type>
reallyAsync(Function&& f, Ts&&... params)
{
    return std::async(std::launch::async, std::forward<Function>(f), std::forward<Ts>(params)...);
}

 

3. std::thread들을 모든 경로에서 합류 불가능하게 만들어라

 

모든 std::thread는 합류 가능(joinable) 상태이거나 합류 불가능(unjoinable) 상태이다.

 

합류 가능 상태

현재 실행 중인 스레드 혹은 실행 중 상태로 전이할 수 있는 스레드에 대응된다. 또한 실행 완료된 바탕 스레드에 해당하는 std::thread도 합류 가능하다.

 

합류 불가능한 상태

  • 기본 생성자로 생성된 std::thread : 실행할 함수가 없다.
  • 다른 std::thread로 이동되어 빈 상태가 된 상태
  • join에 의해 실행 완료된 바탕 스레드에 해당되지 않게 된다.
  • detach에 의해 대응되는 바탕 스레드와 연결이 끊어진다.

※ 문제가 되는 경우 : 합류 가능한 스레드의 소멸자가 호출되면 프로그램 실행이 종료된다.

 

- 어떤 오래 걸리는 계산을 스레드를 생성하여 실행하는 예제

filter 함수를 인자로 받아 후보 숫자 중 좋은 수만 골라 goodVals에 추가하는 스레드가 있다. 이는 doWork의 실행에서 다른 메인 연산이 따로 있고 이 연산이 반응성이 필요할 때 다른 스레드를 생성해서 실행하도록 한다.

constexpr auto tenMillion = 100000000000;

bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion)
{

    std::vector<int> goodVals;

    // 시간이 오래 걸리는 계산을 실행한다.
    std::thread t([&filter, maxVal, &goodVals] {
        for (auto i = 0; i <= maxVal; ++i)
        {
            if (filter(i))
                goodVals.push_back(i);

        }
    });

    auto nh = t.native_handle();

    //...

    // 다른 작업을 실행하며 반응성을 높인다.

    if (conditionIsSatisfied())
    {
        t.join();
        performComputation(goodVals);
        return true;
    }
    return false;
}

doWork()에서 conditionIsSatisfied() = true 라면 t는 합류되고 문제없이 함수가 종료된다. 그러나 false이거나 예외를 던진다면 함수 끝에서 t가 소멸되는데, t는 여전히 합류 가능하기 때문에 프로그램 실행이 종료된다.

 

그렇다고 암묵적 join이나 암묵적 detach가 좋은 선택도 아니다.

 

암묵적 join이 적용된다면 측정하기 어려운 프로그램 성능 이상이 나타날 수 있으며 conditionIsSatisfied() = false임에도 계산을 계속 실행하는 것이 직관적으로 이해하기 어렵다. 

암묵적 detach의 경우 바탕 스레드가 끊어져 백그라운드에서 데몬 상태로 작업을 진행하는데 이때 참조하는 변수들이 이미 파괴되고 없을지도 모른다. 스레드가 사용하는 람다가 갈무리한 참조자 goodVals, filter는 함수 종료 시 스택 프레임으로부터 pop 돼서 상위 분기로 돌아가고 실행 루틴에서 보이지 않는 메모리가 된다. 이 와중에 다른 함수를 또 호출하는데 detach가 메모리를 사용한다면, 자신이 변경하지도 않았는데 메모리가 변하는 것을 관찰할 수도 있다.

 

따라서 std::thread를 '모든 경로에서' 합류 불가능하게 만드는 것은 프로그래머의 몫이다. 가장 흔한 방법은 해당 객체의 생명 주기가 되는 범위에서 소멸자를 호출할 때 반드시 합류 불가능하게 만드는 것인데 그러한 기법을 쓰는 객체를 RAII 객체라고 한다. Resource Acquisition Is Initialization

#include <thread>

class ThreadRAII
{
public:
	enum class DtorAction { join, detach};
	// 소멸자 행동을 정의한다.
	ThreadRAII(std::thread&& t, DtorAction a)
		:
		action(a), t(std::move(t)) {}

	~ThreadRAII()
	{
		if (t.joinable())
		{
			if (action == DtorAction::join)
			{
				t.join();
			}
			else 
			{
				t.detach();
			}
		}
	}

	std::thread& get() { return t; }
private:
	DtorAction action;
	std::thread t;
};

RAII Thread 클래스의 특징

  • 생성자는 오른 값만 받는다 (이동만 된다.)
  • thread 멤버는 맨 나중에 배치하고 초기화도 맨 나중에 한다. (초기화 시 실행이 되므로)
  • get() 함수는 바탕 std::thread에 접근할 수 있는 인터페이스를 가진다.
  • 소멸자에서 합류 가능한 상태인지 먼저 점검한다. -> 선택된 행동 (join/detach)를 수행한다.

소멸자를 정의한 경우 이동 함수들이 작성되지 않는데, 이동 기능이 지원되지 않을 이유는 없다.

#include <thread>

class ThreadRAII
{
public:
	enum class DtorAction { join, detach};
	// 소멸자 행동을 정의한다.
	ThreadRAII(std::thread&& t, DtorAction a)
		:
		action(a), t(std::move(t)) {}

	~ThreadRAII()
	{
		if (t.joinable())
		{
			if (action == DtorAction::join)
			{
				t.join();
			}
			else 
			{
				t.detach();
			}
		}
	}
	    // 이동 기능
        ThreadRAII(ThreadRAII&&) = default;
        ThreadRAII& operator=(ThreadRAII&&) = default;
        
	std::thread& get() { return t; }
private:
	DtorAction action;
	std::thread t;
};