Computer Science 기본 지식/운영체제

[C++ Thread] memory order 와 atomic 객체

로파이 2021. 4. 27. 20:57

메모리 수정 순서와 동기화 방법을 제공하는 atomic 객체

 

※ 다음 글들을 참고하였습니다.

modoocode.com/271

en.cppreference.com/w/cpp/atomic/memory_order#Relaxed_ordering

 

메모리가 수정되는 순서에 관하여 - 단일 스레드에서

1) 동적 파이프라인 스케줄링에 의해

다음 함수를 컴파일하여 실행했을 시 a 대입과 b 대입의 순서에 대한 문제

#include <iostream>

int a = 0;
int b = 1;

int func(int* arr)
{
	a = arr[1] + b;
	b = 5;

	return a;
}

int main()
{
	int arr[5] = { 0,1,2,3,4 };

	int result = func(arr);

	return 0;
}

 

최적화 옵션(Ox)의 어셈블리 코드

코드 흐름 상으로는 a의 계산이 선행될 것으로 기대하지만 동적 파이프라인 기법을 사용하는 프로세서는 위 코드의 명령어를 b 대입이 먼저 될 수 있는 것을 허용한다. 즉 컴파일러는 명령어 재배치를 통해 b 대입이 먼저 수행될 수 있도록 어셈블리 코드를 생성하게 된다. 실제 어셈블리 코드에서 a의 대입보다 b의 대입을 먼저 배치시켰다.

 

2) 캐시에 의한 재배치

어떤 변수가 캐시에 있고 이 변수에 대한 읽고 쓰기가 다른 변수보다 빠르므로 이 변수에 관한 명령어가 선행될 수 있다.

 

메모리가 수정되는 순서에 관하여 - 다중 스레드에서

다중 스레드를 사용하는 어떤 함수는 변수를 수정하여 쓰기를 실행할 수 있다.

다중 스레드간에 일치하는(consistent) 수정 순서는 메모리 읽기-수정-쓰기 후 a값의 변화 "순서"가 쓰레드간에 똑같은 방향으로 관측되어야 하는 것이다.

a가 1->3->5->7로 변화할 때 각 쓰레드 주기에서 따른 관측 결과는 다음 순서를 가질 수 있다.

  • T1: a=3 -> a=5
  • T2: a=1 -> a=3 -> a=5 -> a=7
  • T3: a=3 -> a=5 -> a=7

각 스레드는 항상 수정 순서가 일치하는 관찰을 보인다.

메모리 수정 순서가 일치하는 의미는 꼭 같은 시간에 두 스레드가 같은 값을 읽는다는 의미는 아니다. a=3로 쓰는 CPU가 쓰기 결과를 다른 CPU에 동기화하지 않는다면 a=3을 읽는 CPU와 다른 CPU에서 동시에 실행되는 스레드는 a=1을 읽을 수 있다.

 

std::atomic

다중 스레드 간에 일치하는 수정 순서를 보장하기 위해 메모리 읽기-수정-쓰기가 원자적으로 일어나야 한다. 한 변수에 대해 이러한 연산이 원자적으로 일어나는 것을 보장하는 std::atomic 객체를 사용할 수 있다.

 

memory_order

아토믹 객체는 원자적 연산과 메모리 수정 순서를 보장한다. 만약 읽기-수정-쓰기 3 연산을 한 번에 수행하지 않고 여전히 원자적 연산이지만 다른 스레드에서 한번씩 수행할 수 도 있다. (적재/저장을 분리하는 등으로) 이 때, std::atomic의 읽기-쓰기 순서를 지정하는 행동 타입을 설정할 수 있다.

 

Relaxed Ordering

memory_order_relaxed

int r1 = 0, r2 = 0;
std::atomic<int> x(0), y(0);
void thread1()
{
	r1 = y.load(std::memory_order_relaxed); // A
	x.store(r1, std::memory_order_relaxed); // B
}

void thread2()
{
	r2 = x.load(std::memory_order_relaxed); // C
	y.store(42, std::memory_order_relaxed); // D
}

가장 유연한 메모리 순서 옵션으로 일반적인 상황처럼 전 후 배치되는 연산이 컴파일러에 의해 재배치 될 수 있음을 의미한다.  두 함수를 다른 스레드에서 실행하면 r1=r2=42의 결과가 나올 수 있는데, 명령어 재배치로 D -> A -> B -> C 순으로 실행이 이루어 질 때 이러한 결과를 얻을 수 있다.
 1. y = 42 (thread2)
 2. r1 = 42 (thread1)
 3. x = r1 = 42 (thread1)
 4. r2 = x = 42 (thread2)

 

단순 증가 연산을 수행하는 아토믹 객체

#include <vector>
#include <iostream>
#include <thread>
#include <atomic>
 
std::atomic<int> cnt = {0};
 
void f()
{
    for (int n = 0; n < 1000; ++n) {
        cnt.fetch_add(1, std::memory_order_relaxed);
    }
}
 
int main()
{
    std::vector<std::thread> v;
    for (int n = 0; n < 10; ++n) {
        v.emplace_back(f);
    }
    for (auto& t : v) {
        t.join();
    }
    std::cout << "Final counter value is " << cnt << '\n';
}

cnt를 단순 증가시키는 것은 모든 읽기-수정-쓰기가 원자적으로 일어나므로 다중 쓰레드에서 3 연산이 분리되어 있지 않다. share_ptr의 참조 횟수 카운팅 증가가 단순한 relaxed 모델을 사용한다. (감소 연산은 acquire-release 모델이 필요함)

 

Release-Acquire Ordering

relaxed에서 다중 쓰레드 환경에서 메모리 접근 순서에 문제가 될 수 있는 범위는 두 가지가 될 수 있다.

1) a에 쓰기 명령 이전의 쓰기 명령어들이 재배치되어 이후에 삽입되어 해당 내용이 다른 스레드에게 보여지지 않는 경우

2) a를 읽기 명령어 이후 쓰기 명령어들이 앞으로 재배치되어 다른 쓰레드에 보여서 문제가 되는 경우

따라서 a의 읽기와 쓰기를 기점으로 한 스레드에서 쓰기 이전 내용이 다른 쓰레드에서 읽는 시점에서 동기화가 되어있어야 한다. 따라서 두 경우에서 release-acquire 순서를 유지해 각 표시한 방향으로 쓰기 명령어들이 재배치하는 것을 금지한다.

 

스레드 A에서 공유 변수 shared_data1, shared_data2를 쓰기를 실행한다면, 쓰레드 B에서 a를 load 할 때 그 결과가 보여야 한다.

반대로 a를 load 하고 일어나는 상황이 load 이전으로 옮겨지면 Thread A에서 a를 store 하기 전 상황이 더럽혀질 수 있다.

 

- 예시

#include <thread>
#include <atomic>
#include <cassert>
#include <string>
 
std::atomic<std::string*> ptr;
int data;
 
void producer()
{
    // 쓰기 이전의 영역
    std::string* p  = new std::string("Hello");
    data = 42;
    // ---------------
    ptr.store(p, std::memory_order_release);
}
 
void consumer()
{
    std::string* p2;
    while (!(p2 = ptr.load(std::memory_order_acquire)))
        ;
    // 읽기 이후의 영역
    assert(*p2 == "Hello"); // never fires
    assert(data == 42); // never fires
}
 
int main()
{
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join(); t2.join();
}

아토믹 객체 하나를 사용해서 메모리 순서를 동기화하는 방법을 보여준다. producer 함수의 store 이전에 항상 p = "Hello" 그리고 data는 42를 가지고 있는데 data 쓰기 명령어가 store 이후에 배치되는 것을 금지하고 있기 때문이다. 따라서 consumer 함수에서 아토믹 객체 p가 스트링 "Hello"로 초기화되고 data에 42가 쓰인 상황이 반드시 p2가 load, 읽기가 수행되는 시점에 동기화가 된다. 

 

- memory_order_acq_rel

양쪽으로 재배치되는 것을 금지한다. 아토믹 객체 이후에 쓰기 되는 값들은 다른 스레드에서 아토믹 객체를 얻을 때 (acquire) 보이게 된다. 이 메모리 순서 플래그는 읽기-수정-쓰기가 모두 수행하는 연산 ++cnt (fetch_add())와 같은 연산 시 적용될 수 있다. 

#include <thread>
#include <atomic>
#include <cassert>
#include <vector>
 
std::vector<int> data;
std::atomic<int> flag = {0};
 
void thread_1()
{
    data.push_back(42);
    flag.store(1, std::memory_order_release);
}
 
void thread_2()
{
    int expected=1;
    while (!flag.compare_exchange_strong(expected, 2, std::memory_order_acq_rel)) {
        expected = 1;
    }
}
 
void thread_3()
{
    while (flag.load(std::memory_order_acquire) < 2)
        ;
    assert(data.at(0) == 42); // will never fire
}
 
int main()
{
    std::thread a(thread_1);
    std::thread b(thread_2);
    std::thread c(thread_3);
    a.join(); b.join(); c.join();
}
    bool compare_exchange_strong(_TVal& _Expected, const _TVal _Desired,
        const memory_order _Order = memory_order_seq_cst) noexcept { // CAS with given memory order
        long _Expected_bytes = _Atomic_reinterpret_as<long>(_Expected); // read before atomic operation
        long _Prev_bytes;
#if _CMPXCHG_MASK_OUT_PADDING_BITS
        if constexpr (_Might_have_non_value_bits<_TVal>) {
            _Storage_for<_TVal> _Mask{_Form_mask};
            const long _Mask_val = _Atomic_reinterpret_as<long>(_Mask);

            for (;;) {
                _ATOMIC_CHOOSE_INTRINSIC(_Order, _Prev_bytes, _InterlockedCompareExchange,
                    _Atomic_address_as<long>(_Storage), _Atomic_reinterpret_as<long>(_Desired), _Expected_bytes);
                if (_Prev_bytes == _Expected_bytes) {
                    return true;
                }

                if ((_Prev_bytes ^ _Expected_bytes) & _Mask_val) {
                    _CSTD memcpy(_STD addressof(_Expected), &_Prev_bytes, sizeof(_TVal));
                    return false;
                }
                _Expected_bytes = (_Expected_bytes & _Mask_val) | (_Prev_bytes & ~_Mask_val);
            }
        }
bool compare_exchange_strong(T& expected, const T desired, memory_order)

아토믹 객체 compare_exchange_strong 함수

  • expected 값을 가지고 있다면 desired로 값을 바꾸고 true를 반환하고
  • expected 값을 가지고 있지 않다면 desired 값으로 바꾸지 않고 expected값을 현재 아토믹 객체의 값으로 설정하고 false를 반환한다.

따라서 compare_exchange_strong는 읽기-수정-쓰기를 하고 있으므로 memory_order_acq_rel 메모리 순서 플래그를 사용해야 한다.

세 개의 스레드 예제는 flag 수정 흐름에서 flag는 0->1 ->2 순서로 이루어지도록 while block을 통해 구현하고 있다.

 

- release/acquire 관계를 이용한 mutex 구현

#include <iostream>
#include <atomic>
#include <thread>

int cnt = 0;
std::atomic<int> flag(0);
void increase()
{
	int i = 0;
	while (i < 1000000)
	{
		int expected = 0;
		if (flag.compare_exchange_strong(expected, 1, std::memory_order_acquire))
		{
			// lock 획득 성공
			++cnt;
			flag.store(0, std::memory_order_release);
			++i;
		}
	}
}

int main()
{
	printf("is_lock_free %d\n", flag.is_lock_free());
	std::thread ts[10];
	for (int i = 0; i < 10; ++i)
	{
		ts[i] = std::thread(increase);
	}

	for (int i = 0; i < 10; ++i)
	{
		ts[i].join();
	}
	printf("%d\n", cnt);
	return 0;
}

acquire-release를 이용한 mutex 구현

  • acquire: 이전의 명령들이 후로 배치되는 것을 금지한다.
  • release: 이후 명령들이 전으로 배치되는 것을 금지한다.

따라서 increase를 실행하는 쓰레드는 lock을 획득하는 시점을 기준으로 전처리 (i값 비교, excpected 값 비교)와 후처리(++i 연산)와 같은 영역이 서로 침범되지 않는다.

 

Sequentially-consistent Ordering

memory_order_seq_cst 플래그로 설정된 아토믹 객체는 memory_order_seq_cst와 마찬가지로 store 이전의 상황이 다른 스레드의 load시 동기화가 되는 기능을 제공한다. 또한 memory_order_seq_cst는 "single total modification order" 단일 수정 순서 = 전지적 시점에서 실행된 순서가 모든 아토믹 객체를 사용하는 쓰레드 사이에 동기화된다.

 

#include <atomic>
#include <iostream>
#include <thread>

std::atomic<bool> x(false);
std::atomic<bool> y(false);
std::atomic<int> z(0);

void write_x() { x.store(true, std::memory_order_release); }

void write_y() { y.store(true, std::memory_order_release); }

void read_x_then_y() {
  while (!x.load(std::memory_order_acquire)) {
  }
  if (y.load(std::memory_order_acquire)) {
    ++z;
  }
}

void read_y_then_x() {
  while (!y.load(std::memory_order_acquire)) {
  }
  if (x.load(std::memory_order_acquire)) {
    ++z;
  }
}

int main() {
  thread a(write_x);
  thread b(write_y);
  thread c(read_x_then_y);
  thread d(read_y_then_x);
  a.join();
  b.join();
  c.join();
  d.join();
  std::cout << "z : " <
}

 

기존 release-acquire 관계를 이용하는 a, b, c, d의 스레드를 실행했을 때 보장하는 것은 다음과 같다.

  • write_x()에서 x(store) 이전의 문맥 = read_x_then_y()의 x(load) 시점의 문맥
  • write_y()에서 y(store) 이전의 문맥 = read_y_then_x()의 y(load) 시점의 문맥

따라서 실행 결과 값은 다음과 같이 나올 수 있다.

Z 값 전지적 실행 순서  
z = 2 a -> b -> c(d) -> d(c) x,y 결과가 정해진 이후 이기 때문에 c, d의 결과는 순서에 상관없이 같다.
z = 1 a -> c -> b -> d
b -> d -> a -> c
쓰레드 c는 y가 0으로 관측되기 때문에 z를 증가시키지 않는다. 혹은 그 반대.
z = 0 CPU 1: a -> c
CPU 2: b -> d
이러한 상황은 두 흐름이 다른 CPU에서 일어날 때 발생한다.

z = 0인 상황이 발생하는 이유

CPU의 계산 결과가 항상 메모리에 써지는 것은 아니다. 실제 CPU 명령어들을 빨리 처리하기 위해 메모리에 접근하는 것을 피하기 때문에 캐시에 두었다가 캐시에서 내보내질 때 메모리에 쓰는 방식 write-back을 많이 사용한다. 이런 경우 각 CPU마다 고유한 캐시를 사용할 경우 메모리에 직접 쓰기 전까지는 동기화가 안 되어 있을 수 있다.

 

기존 release-acquire는 store 이전의 상황이 load 시점에서 보이는 것을 보장하나 그러한 release-acquire 발생 사건이 모든 쓰레드에 보여지는 것은 아니다. memory_order_seq_cst는 어떤 아토믹 객체에 대한 실제 수정 순서가 모든 스레드에게 동일하게 보임을 보장한다.

 

atomoic 객체의 기본 memory_order 플래그의 기본 값은 memory_order_seq_cst이다.

 

- memory_order_seq_cst 예제

#include <thread>
#include <atomic>
#include <cassert>
 
std::atomic<bool> x = {false};
std::atomic<bool> y = {false};
std::atomic<int> z = {0};
 
void write_x()
{
    x.store(true, std::memory_order_seq_cst);
}
 
void write_y()
{
    y.store(true, std::memory_order_seq_cst);
}
 
void read_x_then_y()
{
    while (!x.load(std::memory_order_seq_cst))
        ;
    if (y.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
void read_y_then_x()
{
    while (!y.load(std::memory_order_seq_cst))
        ;
    if (x.load(std::memory_order_seq_cst)) {
        ++z;
    }
}
 
int main()
{
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join(); b.join(); c.join(); d.join();
    assert(z.load() != 0);  // will never happen
}