Computer Science 기본 지식/운영체제

[C++ Thread] Lock-Free Programming - (1) Lock Free ? / Lock Free Stack

로파이 2021. 10. 13. 20:57

Lock-Free 코드란?

- Concurrency와 Scalibility를 가지는 동기화 방법

  • 코드상 존재하는 블로킹/기다림을 제거하거나 줄이는 것

- 블록킹을 사용하면서 생기는 잠재적인 문제

  • 경쟁 조건 (Race Condition): 잘못된 락의 사용 혹은 락의 부재
  • 데드락 (DeadLock) : 락이 다른 스레드에 의해 잠긴 상태에서 영원히 해제가 안되는 상태
  • Simplicity vs Scalabilty : 락을 이용한 프로그램은 단순하다. 하지만 락에 의해 발생하는 병목 현상으로 사용자의 증가로 인한 성능 이슈가 발생한다 (Low Scalability). 블로킹에 의해 발생하는 콘보이 효과 / 우선순위 뒤집힘(Priority Inversion) 등.

Compare And Exchange (CAS)
Loc-Free Programming에서 주로 사용되는 CAS 함수는 메모리 읽기/비교/쓰기를 모두 원자적으로 실행한다.

CAS는 대상 메모리의 값이 기대값(Expected)와 같다면 목표값(Desired)으로 초기화 시킨다.

Read-Modify-Write 작업이 원자적으로 일어난다.

template<typename T>
bool CompareAndExchange(T* const& Memory, T& Expected, const T& Desired)
{
	if (*Memory == Expected)
	{
		*Memory = Desired;
		return true;
	}

	Expected = *Memory;
	return false;
}

 

"Lock-Freedom"의 3가지 수준

- Wait-Free : 항상 어떤 스레드도 Wait을 하지 않고 모든 쓰레드가 실행 중이며, 즉 바운더리가 있는 CPU 사이클 내에 어떤 작업이 완료될 것이라고 보장된다.

- Lock-Free : 항상 적어도 하나의 쓰레드가 실행 중임을 보장받는다. 모든 Wait-Free는 Lock-Free이지만 반대는 성립하지않는다.

- Obstruction-Free : 가장 약한 정의로 적어도 하나의 쓰레드가 언젠가는 방해받지 않고 고립된 상태로 실행될 것이라 보장한다.

 

SpinLock은 Lock-Free인가?

Lock-Free라고 알려져 있는 SpinLock의 경우 직접적인 lock을 사용하는 뮤텍스, 세마포어와 같은 동기화를 위한 원시 객체와 다르게 CPU 폴링 기법으로 작업의 진행가능 여부(Flag)를 묻기 때문에 그렇게 생각할 수 있다.

하지만 엄밀히 말해서 SpinLock의 경우 LockFree라고 할 수 없다. 그 이유는 SpinLock을 사용하는 두 쓰레드에서 한 쓰레드가 Flag를 내리지 않고 중지된 경우 다른 쓰레드가 임계영역을 진입할 수 있음에도 Flag를 얻지 못해 무의미한 대기를 하게 된다. 따라서 "Lock-Freedom"의 3가지 수준 중 Lock-Free는 적어도 하나의 쓰레드가 진행됨을 보장받아야하는데, SpinLock은 그렇지 못한 상황이 발생하고 기아 현상을 유발한다.

 

std::atomic<T>

atomic 개체는 해당 객체가 참조하는 메모리에 대해 읽고 쓰는 연산을 원자적 단위로 수행하는 것을 보장한다. load()/store()와 같은 원자적 읽기, 쓰기를 지원한다. 또한 원자 객체는 CAS와 관련된 함수 compare_and_exchange 함수를 지원한다.

타입 T는 TriviallyCopyable한 타입으로 리터럴형 형식(int,char,float,double...)과 구조체가 일반적이며 가상함수가 있는 클래스는 TriviallyCopyable 하지 않다.

atomic<int> data;
int read = data.load();
data = 5; // data.store(5);

int comparend = 5, desired = 10;
data.compare_exchange_weak(comparend, desired);

atomic 개체의 메모리 순서 지정

다중 쓰레드에서 공유 자원의 값이 일관적인 순서에 따라 동일하게 보여짐을 보장하는 메모리 순서 지정에 관해 atomic 객체가 관여하며 쓰레드에서 자원을 얻거나(Acquire) 반납(Release)하는 순간을 선언한다. 

메모리 순서

Acquire 시점에서 이전에 Thread A에서 Release가 있었다면 메모리 순서상 Release 전 쓰기 결과는 모두 Thread B에서 보여진다.

 

atomic 원자성의 주기

atomic 개체를 사용했더하더라도 원자적 연산 이후에는 쓰레드가 선점될 수 있는 갭이 존재한다. 다음과 같이 if->then의 논리 사이에 무슨 일어날지 모른다. 더 이상 쓰레드 안전하지 하므로 정적 객체를 생성할 때 더블 체크를 해야한다. 

class Widget
{
private:
	static mutex mtx;
	static atomic<Widget*> pInstance;

public:
	// Double-Checked Locking
	static Widget* GetInst()
	{
		if (pInstance == nullptr)
		{
			// 이 구간은 더 이상 쓰레드 안전하지 않다.
			lock_guard<mutex> lk(mtx);
			// 한번 더 확인
			if (pInstance == nullptr)
			{
				pInstance = new Widget();
			}
		}
		return pInstance.load();
	}
};

std::once_flag

Double Check 문제를 한번에 해결하는 std::once_flag는 프로그램 실행 시작 시점부터 전역적으로 딱 한번만 실행하는 것을 보장한다. std::call_once(once_flag, [](){// 인스턴스 초기화 });

static Widget* GetInst()
{
	if (pInstance == nullptr)
	{
		std::call_once(flag, []() { pInstance = new Widget; });
	}
	return pInstance;
}

 

Wait-Free 자료구조 설계

Wait-Free는 바운드된 실행 스탭 안에서 어떤 작업이 완료될 것이라는 것을 보장해야한다. Wait-Free 자료구조는 만들기 극히 어렵다. 모든 쓰레드가 작업이 바운드된 실행 스탭내에서 완료되는 것을 보장하려면 다른 쓰레드에 의해 언제끝날지 모르는 락이 존재하면 안된다.

 

Lock-Free 자료구조 설계

Lock-Free는 항상 적어도 하나의 쓰레드가 자료구조에 동시적으로 접근하는 것을 보장해야한다. LockFree 큐라면 하나의 쓰레드가 Push, 하나의 쓰레드가 Pop을 하고 있을 수도 있다. 자료구조에 접근중인 쓰레드가 스케줄러에 의해 중지되었을 지라도 다른 쓰레드가 중지된 쓰레드를 기다리지 않고 작업을 완료할 것이라고 기대될 것이다. 

 

LockFree 기반 자료구조의 장단점

락 프리 자료구조를 사용하는 주된 이유는 최대한의 동시성 프로그래밍을 하기 위해서이다. 락 기반의 컨테이너는 다른 쓰레드가 락을 사용하여 진입가능한 쓰레드의 블록과 기다림이 발생한다. 락 프리 자료구조는 항상 어떤 쓰레드가 어느 정도 실행을 할 것이라 기대하고 기다림이 없는 자료구조는 다른 쓰레드에 영향 없이 항상 실행 중에 있다.

또한 락 기반의 프로그래밍은 오류로 인해 락을 가지고 있는 쓰레드가 강제 종료되면 다른 쓰레드는 영원히 블록된다. 락이 없는 프로그래밍은 해당 오류 쓰레드의 정보만 손실될 뿐 다른 쓰레드는 여전히 실행 가능하다.

하지만 락이 없는 프로그래밍을 설계하기 위해서 경쟁 조건을 만들지 않기 위해 항상 조심해야하며 원자 개체를 사용했더하더라도 경쟁 조건의 발생으로 미정의 행동을 유발할 수 있다. 또한 메모리 순서에 대해 쓰레드간에 변수 값의 변화가 보여야하는 것도 고려해야한다.

 

Lock-Free Stack

LIFO 특징을 갖는 Stack을 먼저 설계해본다. 

 

기본적으로 노드는 단방향 연결리스트를 사용한다.

template<typename T>
struct Node
{
	T value;
	Node<t>* Next;
	Node():value{}, Next(nullptr){}
	Node(const T& _value): value(_value), Next(nullptr){}
};

 

Push()

스택

스택의 Push

 

1) 새로운 노드를 만든다.

2) 새로운 노드의 다음 노드를 스택의 Head로 지정한다.

3) 새로운 노드를 Head로 지정한다.

 

동기화가 필요없는 코드는 다음과 같을 것이다.

void Push(const T& value)
{
    Node<T>* NewNode = new Node<T>(value);

    // 동기화 없이 스택에 노드를 추가하는 방법
    NewNode->Next = Head;
    Head = NewNode;
}

 

 

멀티쓰레드를 사용하는 자료구조는 공유 변수 Head의 일관성을 보장하지 않는다고 생각해야한다. 즉 NewNode의 Next를 Head로 대입하는 것과 동시에 다른 쓰레드에서 NewNode를 생성, 같은 Head를 Next로 대입할 것이다. 

 

이를 위해 CAS 기법으로 Head가 최근에 바뀌었는지 확인하고 동기화하여 Push를 하도록 해야한다.

  bool compare_exchange_weak(_Ty& _Expected, const _Ty _Desired) noexcept

compare_exchage_weak에는 Expected와 Desired를 매개변수로 넘겨야하는데 어떤 변수를 넘겨야할까?

  1. 항상 NewNode->Next는 Head 이어야한다.
  2. 1이 만족할 때만 Head에 NewNode를 대입해야한다.

따라서 2)에 따라 Desired에 NewNode를 Head는 위 함수를 호출해야하는 주체이다. 이로써 Head는 atomic<Node<T>*>가 되어야하는 것이다. 1)을 한번 대입하고 난 다음 Expected에 NewNode->Next를 전달하여 Head가 NewNode->Next로 잘 대입되었는지 검사하여 만약 같다면 Push가 잘 이루어진 것이고 아니라면 NewNode->Next에 실제 Head의 값(포인터)가 대입되므로 1)을 자동으로 다시 실행한 셈이 된다. (CAS는 Expected와 다르면 Expected를 실제 atomic 객체의 값으로 업데이트한다.

void Push(const T& value)
{
	Node<T>* const NewNode = new Node<T>(value);
	/*
	동기화 없이 스택에 노드를 추가하는 방법
	NewNode->Next = Head;
	Head = NewNode;
	*/
	NewNode->Next = Head.load(); // 일단 헤드를 다음을 추가한다.
	// 이 사이에 [Context Switching]으로 다른 쓰레드에 의해 Head가 변경될 수 있다.
	// 그렇다면 새로운 Head를 pNode->pNext에 할당해야 할 것이다.
	// CAS 기법으로 일단 그 사이에 헤드가 바뀌지 않았는지 확인하는 것으로 데이터를 집어 넣으면 된다.
	// compare_exchange_weak는 루프문에 사용되며 Head가 Compare값과 비교해서 다르면 Compare는 실제 Head값으로 초기화된다.
	while (!Head.compare_exchange_weak(NewNode->Next, NewNode)){}
}

 

Pop()

스택의 Pop

Pop의 과정은 Push와 반대로 생각하면 된다.

1) Head를 Node에 지정한다.

2) Head->Next를 Head로 만든다.

3) Node를 삭제한다.

void Pop(T& value)
{
	//동기화 없이 스택에서 노드를 제거하는 방법
	if(Head == nullptr) 
    	return;

	Node<T>* OldHead = Head;
	Head = OldHead->Next;
    
	value = OldHead->value;
    
	delete OldHead;
}

 

 

Push와 비슷한 원리로 Head는 올바르게 대입되어 OldHead인지 CAS로 확인하도록한다.

void Pop(T& value)
{
	/*
	동기화 없이 스택에서 노드를 제거하는 방법
	if(Head == nullptr) return;

	Node<T>* OldHead = Head;
	Head = OldHead->pNext;
	delete pNode;
	*/
	Node<T>* OldHead = Head.load();
	while (OldHead && !Head.compare_exchange_weak(OldHead, OldHead->Next)) {};

	if (OldHead == nullptr)
		return;

	value = OldHead->value;

	delete OldHead;
	return;
}

LockFreeStack 다시 설계하기 - Version1

- 지금 노드를 삭제하는 것이 안전한 상황인가?

삭제하려는 노드를 참조하는 쓰레드는 반드시 현재 진행중인 쓰레드로 유일해야한다. 그렇지 않다면 어떤 쓰레드에서는 삭제된 이후의 포인터를 역참조하려할 것 이다. Pop()에서 이미 pNode를 할당하고 꺼내진 상태였다면 더이상 pNode를 참조할 방법이 없기 때문에 Push() 함수에서 pNode가 참조될 일은 없다. 따라서 Pop() 함수에서 delete를 하는 시점에서 Pop()을 하고 있는 쓰레드가 하나 인지 체크해야한다. 이는 현재 Pop() 중인 쓰레드를 카운트하여 오직 하나의 쓰레드가 delete하고 있을 때, 안전한 상황일 것이다.

 

- std::shared_ptr로 삭제한 포인터의 역참조와 같은 메모리 문제 회피하기

장점 : 공유 참조를 이용하여 해당 개체의 소멸을 직접 신경쓰지 않아도 되는 장점이 있다. 댕글링 포인터를 역참조 할 일이 없다.

단점 :

공유 참조 카운팅에 대한 오버헤드가 있으며 shared_ptr의 공유 카운트 동기화가 락-프리인지는 CPU 지원여부에 따라 다르다는 것이다. std::atomic_is_lock_free를 이용하여 shared_ptr의 lock_free 여부를 확인할 수 있다.

C++20에서 사용할 수 있는 기능 : C++20 부터 atomic<shared_ptr<T>> 셰어드 포인터 형식의 아토믹 객체의 원자적 연산 compare_exchange_weak를 지원한다.

 

- Pop()의 value 복사로 값 반환

// 값 복사의 함정
value = OldHead->value;

임의의 T 인자에 대해 값 복사가 예외에 안전할 수 없다. 값 복사시에 예외가 발생하면 반환값 value에 대한 정보를 잃고 스택도 이미 Pop된 상황이 되어버린다. 따라서 포인터 형식으로 레퍼런스를 반환하고 마찬가지로 shared_ptr로 관리하게 한다.

 

- LockFreeStack 버전 1

/*
	LockFreeStack - V1
	1) 모든 노드는 shared_ptr로 참조된다.
	2) 이제 Node의 Data는 스마트 포인터로 관리된다.
	 - Data의 직접적인 복사로 인한 예외를 발생시키지 않는다.
*/

template<typename T>
class LockFreeStackV1
{
private:
	struct Node
	{
		/*
			이제 Node의 Data는 스마트 포인터로 관리된다.
			Data의 직접적인 복사로 인한 예외를 발생시키지 않는다.
		*/
		shared_ptr<T> Data;
		shared_ptr<Node> Next;
		Node(const T& _Data) :
			Data(make_shared<T>(_Data))
		{}
	};
	using NodePtr = std::shared_ptr<Node>;
	std::atomic<NodePtr> Head;
public:
	LockFreeStackV1()
	{
	}
	~LockFreeStackV1()
	{
		while (Pop()) {}
	}
	void Push(T const& Data)
	{
		NodePtr const NewNode = make_shared<Node>(Data);
		NewNode->Next = Head.load();
		while (!Head.compare_exchange_weak(NewNode->Next, NewNode));
	}
	shared_ptr<T> Pop()
	{
		NodePtr OldHead = Head.load();
		while (OldHead && !Head.compare_exchange_weak(OldHead, OldHead->Next));

		return OldHead ? OldHead->Data : shared_ptr<T>();
	}
};

메모리 순서 고려하기

아토믹 개체를 사용할 때 메모리 순서를 고려하여 기본 인자 std::memory_order_seq_cst 대신 relaxed 혹은 release/acquire로 약간의 최적화를 기대할 수 도 있다. Push에서 일어난 일이 Pop에서 항상 보여야(Visible)하므로 Push happens before Pop이라고 볼 수 있다. 따라서 Push의 compare_exchange_weak에서 성공했을 시 메모리 모델을 release로 Pop의 compare_exchange_weak에서 성공했을시 메모리 모델을 acquire로 둔다. load() 는compare_exchange_weak 이전에 일어나고 실패시 변경없이 loop를 진행하기 때문에 relaxed로 둔다.

	void Push(T const& Data)
	{
		NodePtr const NewNode = make_shared<Node>(Data);
		NewNode->Next = Head.load(std::memory_order::relaxed);
		while (!Head.compare_exchange_weak(NewNode->Next, NewNode, std::memory_order::release, std::memory_order::relaxed));
	}
	shared_ptr<T> Pop()
	{
		NodePtr OldHead = Head.load(std::memory_order::relaxed);
		while (OldHead && !Head.compare_exchange_weak(OldHead, OldHead->Next, std::memory_order::acquire, std::memory_order::relaxed);

		return OldHead ? OldHead->Data : shared_ptr<T>();
	}

 

LockFreeStack 다시 설계하기 - Version2

- shared_ptr의 동기화에 대해 lock-free를 지원하지 않는 경우

CPU 아키텍쳐에 따라 shared_ptr의 공유 카운트의 동기화가 lock-free하지 않을 수 있다. 이 상황에서는 카운팅을 직접하여 LockFree 자료구조를 직접 설계해야할 것이다.

 

1) External/Internal 카운팅

  • External : 외부에서 노드 포인터를 Read했을 때 External 카운트를 증가시킨다.
  • Internal : 함수내에서 작업이 이루어지는 도중에 임시적으로 카운트하는 것으로 Read가 끝날때마다 Internal 카운트를 감소시킨다.

 - Pop()과 같이 더 이상 해당 노드를 접근할 수 없는 상태가 되면, Internal 카운트를 External 카운트 - 1만큼 증가시킨다. 더 이상 외부로의 참조가 없기 때문에 Internal 카운트가 현재 참조 되고 있는 External 카운트 수만큼 감소할 것이라고 기대할 것이기 때문이다.

 - 모든 Read의 종료로 Internal 카운트가 0이 될 때, 해당 노드의 메모리를 삭제한다.

 

2) 설계

struct CountedNodePtr 

- 해당 구조체는 스마트 포인터처럼 포인터를 감싸고 External 카운트와 함께 외부에서 보여지는 Wrapper이다.

stuct Node

- 읽기가 끝날때마다 Internal 카운트를 감소하기 위해 Interal 카운트를 가진다.

 

template<typename T>
class LockFreeStackV2
{
private:
	struct Node;

	// 외부에서 보여지는 포인터
	// 해당 포인터로만 Node를 접근한다.
	struct CountedNodePtr
	{
		int ExternalCount;
		Node* Ptr;
	};

	struct Node
	{
		std::shared_ptr<T> Data;
		std::atomic<int> InternalCount;
		CountedNodePtr Next;

		Node(T const& _Data) :
			Data(std::make_shared<T>(_Data)),
			InternalCount(0)
		{}
	};

	std::atomic<CountedNodePtr> Head;

모든 노드는 CountedNodePtr로 감싸져 관리된다.

CountedNodePtr에 대해 아키텍쳐에 따라 크기가 달라서 포인터 크기만큼 보다 큰 원자적 연산은 lock-free하지 않을 수 있다.

 

- Push()

새로운 노드를 만들고 기존 Head에 할당하는 것처럼 CAS기법을 사용한다. Head가 바뀔때 까지(스택에 삽입될 때까지) Pop() 쓰레드에서 참조되지 않으므로 ExternalCount를 1 (Head로의 외부 참조), Internal Count를 0으로 초기화한 노드를 삽입하기만 하면 된다.

그 사이에 다른 쓰레드의 Push()나 Pop()으로 Head가 바뀌었다면 알아서 다시 루프를 돌 것이다.

void Push(T const& Data)
{
	CountedNodePtr NewNode;
	NewNode.Ptr = new Node(Data);
	NewNode.ExternalCount = 1; // Head에 의한 외부 참조로 1
	// Head를 읽고 있지만 ExternalCount를 신경쓰지 않아도 된다.
	// 다음 두 라인 사이에 Pop()이되거나 Push()가 되면 Head의 Ptr이 바뀌기 때문에 다시 루프를 돌 것이다.
	NewNode.Ptr->Next = Head.load();
	while (!Head.compare_exchange_weak(NewNode.Ptr->Next, NewNode));
}

- Pop()

모든 내/외부 참조는 Pop()에 의해 바뀐다. Pop()에서 External Count의 의미는 몇개의 쓰레드가 현재 해당 노드를 외부로 참조하고 있나를 나타내고 Internal Count는 한번 외부 참조된 노드가 얼마나 반납(읽기가 끝났는가)를 나타낸다. Internal Count는 0이었다가 Pop()되는 도중에만 변하며 해당 노드를 기준으로 Pop()을 진행중 인 쓰레드가 여럿이 있다면 Internal Count가 0이 되는 쓰레드가 마지막 읽기를 끝마쳤으므로 해당 노드를 지울 수 있게 된다. 

 

private:
	void IncreaseHeadCount(CountedNodePtr& OldCounter)
	{
		// (1) Head load로 인한 ExternalCount 증가에 대한 LockFree 동기화
		CountedNodePtr NewCounter;
		do
		{
			NewCounter = OldCounter;
			++NewCounter.ExternalCount;
		} while (!Head.compare_exchange_strong(OldCounter, NewCounter));

		OldCounter.ExternalCount = NewCounter.ExternalCount;
	}

public:
	std::shared_ptr<T> Pop()
	{
		CountedNodePtr OldHead = Head.load();
		for (;;)
		{
			IncreaseHeadCount(OldHead);

			// (2) 이제 해당 Head를 역참조해도 안전하다. (외부 카운트를 미리 증가시켜놓았음)
			Node* const Ptr = OldHead.Ptr;

			// 비어있는 상태
			if (!Ptr)
			{
				return std::shared_ptr<T>();
			}

			// (3) 해당 라인까지 추가로 Pop()한 쓰레드가 없다면 OldHead는 Head로 동기화가 되어 있어야한다.
			//    그 사이에 다른 쓰레드에 의해 읽기가 수행되었다면(ExtenalCount 증가) 다시 OldHead를 동기화한다.
			//    Push로 Head가 변경되어도 마찬가지
			if (Head.compare_exchange_strong(OldHead, Ptr->Next))
			{
				std::shared_ptr<T> Res;
				Res.swap(Ptr->Data);
                
				// (4) 내부 참조 증가
				// 더 이상 Ptr를 접근하는 것이 불가능하기 때문에
				// InternalCount에 ExternalCount - 2 만큼 증가
				const int CountIncrease = OldHead.ExternalCount - 2;

				// 외부 참조(CountIncrease) + 내부 참조(fetch_add 결과) == 0 이라면 삭제
				if (Ptr->InternalCount.fetch_add(CountIncrease) == -CountIncrease)
				{
					delete Ptr;
				}

				return Res;
			}
            
			// (5) 동시성 쓰레드의 OldHead 읽기 종료
			// Head의 바뀜으로 Read 종료를 마킹
			// 만약, 현재 쓰레드가 해당 노드를 외부로 참조하는 마지막 쓰레드 였다면
			// Ptr 삭제
			else if (Ptr->InternalCount.fetch_sub(1) == 1)
			{
				delete Ptr;
			}
		}
	}
};

(1) IncreaseHeadCount

스택에서 Pop()할 노드를 Head에서 불러올 것이다. 이 때 외부 참조가 증가하므로 1 증가시키기 위해 Head가 그 사이에 변하였는지 혹은 또 다른 Pop()을 수행하는 쓰레드의 선점으로 외부 참조자 증가할 수 있다.

(2) OldHead의 외부 참조가 1 증가되었음

이제 외부 참조를 증가시켜 놓았으므로 역참조를 해도 안전하다. 해당 노드가 지워질 때는 내부 참조가 0이되는 순간인데, 외부 참조가 증가한 상황에서 내부 참조가 증가나 감소로 0이 될 수 없기 때문이다.

이 사이에 포인터가 null 이라면 스택은 비어있으므로 빠져나간다. 할당되지 않은 노드를 신경쓸 필요가 없다.

(3) Head를 삭제하기 위해 다음 노드로 지정하는 CAS 시도

Head가 그 사이에 다른 Pop() 쓰레드 혹은 Push()로 변경될 수 있다. 여기서 해당 노드를 Pop() 할 수 있는 지 마지막으로 시도한다.

 - CAS 성공 시 : Head는 변경되고 OldHead는 더 이상 추가적인 외부 참조가 불가능해진다. 현재 OldHead와 같은 포인터를 외부로 참조하고 있는 쓰레드는 1(함수 반환 직전)~3에 머물러 있는 쓰레드(동시성 Concurrent 쓰레드) 일 것이다. 이 때 해당 노드는 더 이상 참조가 불가능하기 때문에 Internal 카운트가 감소할 일만 남았다. 동시성 쓰레드 개수는 OldHead.ExternalCount - 2이고 Internal 카운트를 그 만큼 증가시킨다.

(4) 내부 참조의 증가

fetch_add의 결과는 해당 아토믹 개체의 이전 값(previous)을 반환하는 데, 그 값이 - 동시성 쓰레드 개수라는 의미는 fetch_add를 하기전까지 찰나의 순간에 5)의 진입으로 모든 동시성 쓰레드의 읽기가 끝났다는 것을 의미한다. 그렇다면 해당 노드 메모리를 해제한다.

- 상황 예시 (생각해보기)

더보기

 

/* 4)에서 노드가 지워지는 상황
ex) 스택에는 데이터가 하나 있음 Head.E_Count = 1
	쓰레드 1 Pop() 진입 OldHead.E_Count = 2, Head.E_Count = 2;
	쓰레드 2 Pop() 진입 OldHead.E_Count = 3, Head.E_Count = 3;
	쓰레드 1 compare_exchange 시도, OldHead(2)와 Head(3)의 E_Count 불일치로
	Ptr->InternalCount를 1 감소, I_Count = -1, 다시 (1)로가서 OldHead(3)이된다.
	쓰레드 2 compare_exchange 성공, Head = Ptr->Next(nullptr)
	CountIncrease = OldHead(3) - 2 = 1
	InternalCount.fetch_add(1) == -1 (true, I_Count 이전 상태는 -1)
	Ptr 삭제
    쓰레드 1 (2)이후 비어있는 스택으로 return
*/

 

- CAS 실패 시 : 해당 쓰레드는 5에 진입한다. 읽기가 끝났으므로 내부 참조를 줄인다. 만약 내부 참조를 한 이후 참조가 0 즉 그 전에 1이었다면 그 쓰레드가 마지막 읽기 였으므로 노드 메모리를 해제한다.

- 상황 예시 (생각해보기)

더보기
/* (5) 동시성 쓰레드의 OldHead 읽기 종료
ex-1) 스택에는 데이터가 하나 있음 Head.E_Count = 1
	쓰레드 1 Pop() 진입 OldHead.E_Count = 2, Head.E_Count = 2;
	쓰레드 2 Pop() 진입 OldHead.E_Count = 3, Head.E_Count = 3;
	쓰레드 2 compare_exchange 성공, Head = Ptr->Next(nullptr)
		CountIncrease = OldHead(3) - 2 = 1
		// I_Count는 0인 상태, fetch_add 결과로 1로 증가
		InternalCount.fetch_add(1) == -1 (false, 다른 쓰레드가 외부에서 참조 중)
		// 삭제하지 않고 return Res;
	쓰레드 1 Ptr->InternalCount.fetch_sub(1) == 1 (true, 읽기 되었다가 더 이상 읽기가 되지 않음) 

ex-2) 쓰레드 1이 OldHead.E_Count = 2인 상황에서 
	그 사이에 다른 쓰레드가 Push를 하여 Head가 바뀜 그리고 다시 Pop()을 하였음
	쓰레드 1은 I_Count를 1 감소 (-1)하고 return
	쓰레드 1 다시 Pop() 진입, OldHead.E_Count = 3으로 증가
	compare_exchange 성공, CountIncrease = 3 - 2 = 1
	InternalCount.fetch_add(1) == -1, 해당 노드 삭제
*/

 

메모리 순서 고려하기

- Head와 외부 참조에 대해

LockFreeStackV1에서와 비슷하게 생각해볼 수 있다. Push()는 Pop() 이전에 발생했다면 Push happens before Pop가 보장되어야 한다. Push()의 CAS 성공시 memory_order_release를 지정한다.

하지만 V1과 다르게 V2에서는 외부 참조 증가를 먼저한다. 만약 Head가 Push로 인해 변경되었다면 IncreaseHeadCount에서 반드시 그 결과가 보여야할 것이다. 따라서 해당 함수의 CAS에서 memory_order_acquire를 지정한다.

IncreaseHeadCount가 항상 (3)의 CAS보다 선행되기 때문에 추가적으로 acquire를 지정할 필요가 없다. 여기서는 relaxed를 지정한다.

- 내부 참조

노드가 Pop()되었다면 해당 노드를 따로 빼두는 swap이 delete보다 선행되어야 한다. 이를 위해 명령어 재배치 등을 금지하기 위해 InternalCount.fetch_add에서 memory_order_release를 지정해준다.

또 하나 (5)에서 내부 참조가 감소하여 0이되는 상황을 올바르게 캐치하려면 증가된 상황이 선행되어야 한다. (4)에서 memory_order_release를 지정했기 때문에 여기서 memory_order_acquire를 지정해야할 것이다. 하지만 해당 상황이 되는 쓰레드는 오직 하나이므로 그 외의 상황에서는 relaxed로 두고 0이되는 특정 상황에서만 acquire를 지정한 load를 수행하여 과도한 메모리 순서 제약을 피한다.

public:
	void Push(T const& Data)
	{
		CountedNodePtr NewNode;
		NewNode.Ptr = new Node(Data);
		NewNode.ExternalCount = 1; // Head에 의한 외부 참조로 1
		NewNode.Ptr->Next = Head.load(std::memory_order::relaxed);
		while (!Head.compare_exchange_weak(NewNode.Ptr->Next, NewNode, std::memory_order::release, std::memory_order::relaxed));
	}

private:
	void IncreaseHeadCount(CountedNodePtr& OldCounter)
	{
		// 1) Head load로 인한 ExternalCount 증가에 대한 LockFree 동기화
		CountedNodePtr NewCounter;
		do
		{
			NewCounter = OldCounter;
			++NewCounter.ExternalCount;
		} while (!Head.compare_exchange_strong(OldCounter, NewCounter, std::memory_order::acquire, std::memory_order::relaxed));

		OldCounter.ExternalCount = NewCounter.ExternalCount;
	}
public:
	std::shared_ptr<T> Pop()
	{
		CountedNodePtr OldHead = Head.load(std::memory_order::relaxed);
		for (;;)
		{
			IncreaseHeadCount(OldHead);

			Node* const Ptr = OldHead.Ptr;

			if (!Ptr)
			{
				return std::shared_ptr<T>();
			}

			if (Head.compare_exchange_strong(OldHead, Ptr->Next, std::memory_order::relaxed))
			{
				std::shared_ptr<T> Res;
				Res.swap(Ptr->Data);

				const int CountIncrease = OldHead.ExternalCount - 2;

				if (Ptr->InternalCount.fetch_add(CountIncrease, std::memory_order::release) == -CountIncrease)
				{
					delete Ptr;
				}

				return Res;
			}

			else if (Ptr->InternalCount.fetch_sub(1, std::memory_order::relaxed) == 1)
			{
				auto dummy = Ptr->InternalCount.load(std::memory_order::acquire);
				delete Ptr;
			}
		}
	}

 

참고 :

C++ Concurency In Action - Anthony Williams

https://www.youtube.com/watch?v=c1gO9aB9nbs 

https://www.youtube.com/watch?v=HP2InVqgBFM&t=2621s