Advanced C++

[Modern C++] (7-2) 동시성 API

로파이 2021. 5. 1. 14:46

C++11부터 도입된 동시성 API관련 항목을 이어 공부한다.

 

4. 스레드 핸들 소멸자들의 다양한 행동 방식을 주의하라.

 

std::thread, 미래 객체 std::future<T> 모두 시스템 스레드에 대응되는 핸들이라고 할 수 있다. std::thread는 소멸자 호출시 바탕 스레드가 합류가능상태이면 프로그램이 종료되어버린다. 하지만 미래 객체는 암묵적 join/detach를 한 것처럼 프로그램이 종료되지는 않는다.

 

미래 객체는 스레드-미래 객체의 통신 채널의 끝 단자의 역할을 한다. 비동기적으로 실행하는 피호출자는 계산 결과를 통신 채널에 기록한다.(std::promise를 통해서) 호출자는 미래 객체를 통해 결과를 읽는다.

 

피호출자에서 호출자로 정보의 흐름

계산 결과가 저장되는 곳은?

피호출자에 담아둔다: std::promise가 피호출자의 생명 주기안에 있다면 연산의 끝에서 파괴되어버리고 만다.

호출자에 담아둔다: 미래 객체가 유일할 때는 괜찮지만 std::shared_future의 예로 1 피호출자 대 다 호출자 관계를 만들 수 있는데, 이 때 std::promise를 어느 호출자가 마지막 소멸하는 미래 객체인지 알고 저장해두기 어렵다.

 

답은 공유 상태에 저장해 두는 것이다. 공유 상태는 힙 기반 객체로 표현되나 형식,인터페이스,구현은 표준이 구체적으로 명시한 것은 아니다.

 

공유 상태

공유 상태가 중요한 것은 미래 객체의 소멸자 행동이 그 미래 객체와 연관된 공유 상태가 결정하기 때문이다.

 

1) std::async에 의해 시동된 비지연 과제에 대한 공유 상태를 참조하는 마지막 미래 객체의소멸자는 과제가 완료될 떄까지 차단된다. -> 암묵적인 join을 수행한다.

auto future_obj = std::async(std::launch::async, cal_func);

2) 다른 모든 미래 객체의 소멸자는 해당 미래 객체를 파괴한다. 비동기적으로 실행되고 있는 과제의 경우 바탕 스레드에 암묵적 detach를 수행하는 것과 비슷하다. 지연된 과제를 참조하는 마지막 미래 객체의 경우 이는 과제가 절대로 실행되지 않음을 뜻한다.

 

"정상" 행동은 미래 객체의 소멸자가 미래 객체를 파괴한다. 합류 시키지도 않고 탈착하지도 않으며 그 무엇도 실행하지 않는다. 다만, 미래 객체가 참조하는 공유 상태 안의 참조 횟수를 감소시킬 뿐이다.

 

"정상" 행동에 대한 예외는 다음 조건들을 만족하는 미래 객체에 대해서만 일어난다.

  • 미래 객체가 std::async 호출에 의해 생성된 공유 상태를 참조한다.
  • 과제의 시동 방침이 std::launch::async 이다.
  • 미래 객체가 공유 상태를 참조하는 마지막 미래 객체이다.

예외 행동은 비동기적으로 실행되는 과제가 완료될 때 까지 소멸자의 실행이 차단되는 것이다. (암묵적 join)

 

std::async로 시동된 비지연 과제에 대한 공유 상태의 특별한 규칙이 필요한 이유는?

 

암묵적 detach에 관한 문제가 암묵적 join보다 심각한 사안이였고 프로그램 종료 역시 C++ 표준위원회가 바라지 않았다.

 

이에 따라 임의의 미래 객체에 대해 소멸자가 과제가 완료되기를 기다리느라 차단될 것 인지 알아내는 것이 불가능하게 되었다.

std::vector<std::future<void>> futs; // 소멸자가 호출되지 않을 수 있다.

// Widget의 소멸이 일어나지 않을 수도 있다.
class Widget
{
public:
 //...
private:
 std::shard_future<double> fut;
};

 

이러한 특별한 행동은 std::async 호출에 의해서만 발생한다. 그러나 공유 상태의 생성은 여러 방법으로 생길 수 있는데 예를 들면 std::packaged_task 객체 같은 것이다.

int calcValue(); // 실행할 함수

std::packaged_task<int()> pt(calcValue); // 비동기적 시행을 위해 calcValue를 포장한다.

auto fut = pt.get_future(); // pt 에 대한 미래 객체를 얻는다.

std::pacakged_task는 실행할 함수를 포장할 수 있으며 객체 내부에 std::promise를 가지고 있다. 따라서 연관된 공유 상태를 미래 객체를 생성하면서 공유 상태를 생성하게 된다.

 

- packaged_task를 이용한 스레드 생성

{
     std::packaged_task<int()> pt(calcValu);
     
     auto fut = pt.get_future();
     std::thread t(std::move(pt));
     
     //... 어떤 일이 수행되는가
}

packaged_task의 실행은 어떤 스레드를 생성할 때 전달하여 실행하거나 packaged_task는 호출 가능한 객체이므로 다른 스레드에서 operator()로 실행할 수 있다. 어느 것 이든 과제를 할당받은 스레드 종료 방식(join, detach, 프로그램 종료)에 따르므로 미래 객체의 소멸자의 특별한 행동을 고려할 필요가 없다.

 

5. 단발성 사건 통신에는 void 미래 객체를 고려한다.

특정한 사건(event)가 발생해야 작업을 진행할 수 있는 비동기 실행 과제에게 그 사건이 발생햇음을 알려주는 또 다른 과제를 두는 것이 유용한 경우가 있다.

 

- 조건 변수를 이용한 사건 발생 알림

조건 변수를 이용하여 어떤 사건을 검출하고 사건 발생을 다른 사건에 알릴 수 있다. 

 

필요 변수

std::condition_variable cv;
std::mutex m;

검출 과제의 코드

// ... 사건을 검출한다.

cv.notify_one();

검출 과제 코드는 단순히 사건을 검출하고 조건 변수를 통해 발생을 알린다. notify_one() 차단되어 있는 스레드 중 하나를 깨우는 함수이다.

 

반응 과제의 코드

// ...
{
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk);
    
    //...
}

반응 과제에서는 std::unique_lock을 이용하여 뮤텍스를 잠그고 사건 발생이 일어날 때 까지 차단된 상태로 둔다.

 

처리해야할 문제

- 반응 과제가 wait을 실행하기 전에 검출 과제가 조건 변수를 통지하면 반응 과제는 처리하지 못한다.

- wait 호출문은 가짜 기상(spurious wakeup)을 고려하지 않는다. (흔히 있는 일)

그렇다면 정말로 기상을 했는 지 판별해야할 조건이 필요하다.

cv.wait(lk, []{ return 사건 발생 여부; });

자신이 기다리던 조건이 정말로 참인지 과제가 판단할 수 있게 된다.

 

그러한 조건을 std::atomic 변수로 설정하는 것도 생각할 수 있다.

// 전역 변수
bool flag(false);


// ... 사건을 검출한다.
{
   std::lock_guard<std::mutex> g(m);
   flag = true;
}
cv.notify_one();
// ...
{
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return flag; });
    
    //...
}

반응 과제는 wait을 호출해도 가짜 기상을 판별할 수 있다. 그래도 과연 부울 변수까지 필요할 까라는 생각이 들 수 있는데, 사건이 발생했을 때만 통지하게되는데 부울 변수를 또 한번 점검해야되기 때문이다. 원천적으로 가짜 기상을 구별하는 통지는 없는 것 일까

 

조건 변수, 뮤텍스, 플래그를 사용하지 않는 대안은 검출 과제가 설정한 미래 객체를 반응 과제가 기다리게 하는 것이다.std::promise는 사건을 발생을 공유 상태에 기록할 수 있으며 미래 객체가 이 기록을 탐지할 수 있다. 

 

void 타입의 std::promise

std::promise<void> p;

반환 타입이 void의 promise는 값 전달없이 사건의 발생만 알리기에 적합하다.

 

검출 과제의 코드

// ...
p.set_value(); // 반응 과제에 알린다.

 

반응 과제의 코드

// ...
p.get_future().wait(); // p에 해당하는 미래 객체를 기다린다.
// ...

 

이런 설계 방식은 mutex를 필요로 하지도 않으며 반응 과제가 wait으로 대기하기 전에 검출 과제가 자신의 std::promise를 설정해도 작동하며, 가짜 기상도 없다. (가짜 기상은 조건 변수에서만 일어난다.)

 

그럼에도 std::promise를 사용할 겨우 걱정할 오버헤드가 있다.

std::promise와 미래 객체 사이에는 공유 상태가 있으며 대체로 동적 할당되기 때문에 힙 기반 할당 및 해제 비용을 유발한다.

std::promise는 단발성 메커니즘으로 한 번만 설정하고 쓰며 다시 재사용할 수 없다.

 

응용의 예

주로 시스템 스레드를 유보된 상태로 생성하는 방식에 많이 사용된다. 스레드에 필요한 자원을 할당하는 비용을 미리 처리해 두고 실제 실행 준비가 되었을 때, 지연 없이 실행할 수 있게 한다. 즉, 생성 시점과 해당 스레드 함수의 실행시점 사이에서 한 번만 유보시키는 설계에 적용한다.

 

- 예제 코드

std::promise<void> p;

void react();

void detect() // 검출 과제
{
  std::thread t([] // 반응 과제
  {
   p.get_future().wait(); // 스레드를 유보 시킨다.
   react(); // 반응 함수를 실행한다.
  }
  
  //...
  
  p.set_value();
  
  //...
  
  t.join();
}

검출 과제를 담당하는 함수에서 반응 과제를 유보시키고 실행한다 하자. 반응 과제를 담당하는 스레드는 함수 끝에서 합류 불가능하게 하기 위해 t.join()을 실행한다.

 

- 여러 스레드에 한 번에 통지해야할 경우

std::shared_future를 이용하여 하나의 std::promise에 연관된 미래 객체를 여럿 두고 공유 상태를 공유할 수 있다. std::promise에 의해 사건이 발생하면 shared_future 객체의 wait()을 호출했었던 스레드가 모두 깨어나게 된다.

std::promise<void> p;

void detect()
{
    auto sf = p.get_future().share();

    std::vector<std::thread> vt;

    for (int i = 0; i < 10; ++i)
    {
        vt.emplace_back([sf] {sf.wait(); react(); });
    }

    // ...
    
    p.set_value();

    //..

    for (auto& t : vt)
    {
        t.join();
    }
}

 

6. 동시성에는 std::atomic을 사용하고 volatile을 특별한 메모리에 사용한다.

 

std::atomic은 원자적 연산을 하드웨어적으로 실행하게 하는 객체로 Read-Modify-Write (RMW)가 원자적으로 일어날 수 있게 한다.

 

- 원자적 연산

std::atomic<int> a(0);

++a;

--a;

a.store(10);

int b = a.load();

a.fetch_add(-3);

a.fetch_or(0x1111);

그러한 연산은 단순 증감 연산이나, 더하기, 빼기, 비트 연산과 적재(load), 저장(store)에 한정되어 있는데 이는 하드웨어적 구현으로 가능한 원자적 연산의 최대 범위이기 때문이다.곱셈이나 나눗셈은 ALU의 연산 단계가 하나보다 많기 때문에 원자적으로 구현하기 힘들다.

 

- 명령어 재배치를 제한

std::atomic<bool> valAvailable(false);

auto impValue = computeImportantValue();

valVailable = true;

std::atomic 객체를 이용하면 true로 설정될 때, 이전의 명령어가 이후로 배치되는 것을 금지한다. 만약 std::atomic 객체를 쓰지 않는다면, 컴파일 후 다음과 같은 명령어의 재배치가 일어날 수 도 있다.

bool valAvailable = false;

/* 원본 코드
auto impValue = computeImportantValue();

valVailable = true;
*/

// 명령어 재배치로 인한 코드

valVailable = true;

auto impValue = computeImportantValue();

이러한 재배치의 요인은 computeImpotantValue() 계산이 오래걸릴 경우 먼저 알 수 있는 값을 대입할 수 있도록 하는 최적화 과정에서 수행될 수 있기 때문이다.

 

volatile은 사실 동시성 API 주제와 상관 없는 키워드이다. volatile은 어떤 변수가 반드시 메모리에 있어야하고 읽기, 쓰기와 같은 연산이 반드시 메모리를 통해 일어난다는 것을 의미한다. (캐시나 레지스터에 임시로 저장되지 않는다.)

 

volatile은 이러한 재배치를 제한하는 것도 아니고 그렇다고 std::atomic에 해당하지 않는 경쟁 조건이 일어날 수도 있는 일반 변수처럼 취급된다. volatile이 쓰이는 곳은 주로 메모리 대응 입출력(memory-mapped I/O)으로 외부 장치에 연결된 메모리에 접근해서 값을 읽거나 쓸 때 사용하는 메모리를 지칭할 때 사용한다.

 

메모리 대응 입출력 장치, 온도계 값을 읽어 오는 예제

volatile void* memory_device_id_19037 = connect_hardware_device();

float temp_celcius = (float) *memory_device_id_19037;

위 와 같은 사용을 주로 한다. volatile은 이러한 메모리에 대한 읽기 쓰기 연산에 대한 최적화를 금지하도록 명시하는 것인데 다음과 같이 쓴 코드의 volatile이 없는 변수는 최적화시 코드가 일부 제거 될 수 있다.

temp_celcius = (float) *memory_device_id_19037;
temp_celcius = (float) *memory_device_id_19037;  // 더 최신된 값

*memory_device_id_19037 = 32.1;
*memory_device_id_19037 = 30.5; // 더 최신된 값

// 최적화 된 코드
temp_celcius = (float) *memory_device_id_19037;
// temp_celcius = (float) *memory_device_id_19037; 

*memory_device_id_19037 = 32.1;
// *memory_device_id_19037 = 30.5;

컴파일러는 어떤 메모리에 값을 기록한 후 값을 다시 쓰지 않는다면 기록하는 행위를 제거할 수 도 있다는 뜻이다. 따라서 특별한 메모리를 다룰 때는 최적화를 수행하지 말아야하며 변동되는 값들을 메모리에서 읽거나 쓰도록 강제한다.