Computer Science 기본 지식/운영체제

[C++ Thread] Lock Free Programming - (2) Lock Free Queue

로파이 2021. 10. 18. 17:00

LockFree 큐를 설계해본다.

 

LockFree Queue

LockFree Stack에 이어서 큐와 같은 경우 FIFO 특징이 있기 때문에 맨 앞을 가리키는 노드(Head)와 맨 뒤를 가리키는 노드(Tail)이 있어야한다.

 

큐의 설계

처음 큐에는 Head와 Tail이 일치하는 더미 노드가 하나 존재하며 데이터를 가지지 않는다.

Push() : 현재 Tail의 노드(OldTail)에 데이터를 삽입하고 새로운 노드를 Tail로 만들어준다. 새로운 노드에 데이터를 할당하는 것이 아니다.

Pop(): Head와 Tail이 일치하지 않다면, 큐는 적어도 하나의 데이터가 있는 것이다. 이 때 Head는 OldHead가 되고 OldHead의 다음 노드가 Head가 된다.

 

Single Producer Single Consumer Queue

// Single Producer single consumer
template<typename T>
class LockFreeQueueSPSC
{
private:
	struct Node
	{
		std::shared_ptr<T> Data;
		Node* Next;
		Node() : Next(nullptr)
		{}
	};

	std::atomic<Node*> Head;
	std::atomic<Node*> Tail;

	Node* PopHead()
	{
		Node* const OldHead = Head.load();
		if (OldHead == Tail.load())
		{
			return nullptr;
		}
		Head.store(OldHead->Next);
		return OldHead;
	}

public:
	LockFreeQueueSPSC() : Head(new Node), Tail(Head.load()) {}
	~LockFreeQueueSPSC()
	{
		while (Node* const OldHead = Head.load())
		{
			Head = OldHead->Next;
			delete OldHead;
		}
	}
	std::shared_ptr<T> Pop()
	{
		Node* OldHead = PopHead();
		if (!OldHead)
		{
			return std::shared_ptr<T>();
		}
		std::shared_ptr<T> const Res(OldHead->Data);
		delete OldHead;
		return Res;
	}
	void Push(const T& NewValue)
	{
		std::shared_ptr<T> NewData(std::make_shared<T>(NewValue));
		Node* p = new Node;
		Node* const OldTail = Tail.load();
		OldTail->Data.swap(NewData);
		OldTail->Next = p;
		Tail.store(p);
	}
};

첫번째 버전의 큐를 설계하였다. Head와 Tail은 atomic 개체로 되어 있다. 이름에도 볼 수 있듯이 해당 큐는 Single Producer Single Consumer에 적합한 큐로 Push()와 Pop()이 순차적으로 진행되면 문제가 없다. 하지만 멀티쓰레드에서는 당연하게 경쟁조건이 발생하기 때문에 안전하지 않다. 두 쓰레드가 동시에 Push를 하게 되면 동일한 Tail에 새로운 값을 할당할 것이다 (OldTail->Data.swap(NewData)). Pop()의 경우도 OldHead를 두 번 지우려 할 것이기 때문에 미정의 행동을 유발한다.

 

경쟁 조건이 발생하는 Head와 Tail에 대해서 Push()의 경우 데이터를 할당할 때 Tail이 바뀌지 않았는지를 확인해야할 것이고 Pop()의 경우에도 Head가 바뀌지 않았는지 확인해야할 것이다.

 

Push()에서 다중 쓰레드를 핸들링하기

Data를 atomic하게 만들고 NewData를 얻는 것을 CAS로 시도한다. OldTail->Data에 NewData로 바꾸는 CAS가 성공하였다면, OldTail의 Next를 새로운 노드 p로 할당하고 노드 p를 Tail로 만든다. CAS가 실패했다면, 다른 쓰레드에서 OldTail->Data에 이미 할당을 하였기 때문에 다시 루프를 돌아 Tail을 업데이트하고 CAS를 시도한다.

shared_ptr이 lock-free하지 않다는 것을 가정하고 Data를 atomic하게 만들기 위해 atomic<shared_ptr<T>>가 아닌atomic<T*>로 만든다. 이때 새로 할당한 데이터의 메모리를 자동으로 관리하기 위해 shared_ptr를 unique_ptr로 만들고 CAS가 성공했을 때 unique_ptr이 메모리를 release해서 더 이상 자신이 메모리를 관리하지 않게 한다. (메모리는 큐에의해 관리된다.)

	void Push_Broken(const T& NewValue)
	{
		// Data를 compare_exchange_strong()으로 변경해야함
		// unique_ptr로 만들고 성공했을 때 atomic<T*> Data로 관리
		unique_ptr<T> NewData(new T(NewValue));
		CountedNodePtr NewNext;
		NewNext.Ptr = new Node;
		NewNext.ExternalCount = 1; // Tail에 의해

		for (;;)
		{
			// 1. Tail 포인터 로드
			Node* const OldTail = Tail.load();

			// 2. OldTail 역참조하여 Data가 nullptr인지 CAS
			// 3. 다른 쓰레드에서 Pop() 인해 2의 OldTail이 댕글링 포인터가 된다.
			/*  큐에 데이터가 하나 존재 (Node1(Data), Node2(null))
				스레드 1 : OldTail은 Node2
				스레드 2 : Push 또다른 노드 (Node1(Data), Node2(Data2), Node3(null)) 
				스레드 3 : Pop(), Pop()   (Head == Tail == Node3(null))
				스레드 1 : OldTail은 삭제된 포인터(Node2)를 가리키고 있음
			*/
			T* OldData = nullptr;
			if (OldTail->Data.compare_exchange_strong(OldData, NewData.get())) // 데이터 삽입
			{
				// 큐 데이터 구조에서
				// 이전 노드의 Next로 인해 NewNext의 외부 참조가 하나 더 생긴다.
				OldTail->Next = NewNext;

				Tail.store(NewNext.Ptr);
				NewData.release(); // 메모리 릴리즈
				break;
			}
		}
	}

첫번째 멀티 쓰레드를 위한 Push 설계로 1~3의 내용을 주목하자. 이제 Data는 atomic<T*>이기 때문에 compare_exchange_strong으로 값의 변경을 CAS로 시도할 수 있다. 문제는 OldTail이 댕글링 포인터가 될 수 있다는 것인데, 2)에서 역참조를 하고 있으므로 댕글링 포인터의 역참조는 미정의 행동을 유발한다. 위 프로그램에서 기존에 OldTail이 Tail과 같은지 CAS를 하던 것과 다르게 Data를 CAS하고 있기 때문에 OldTail이 Tail과 같다는 보장을 할 수 가 없다. 따라서 다른 쓰레드에서 해당 노드를 Pop()한다면 OldTail은 댕글링 포인터가 된다.

 

큐의 자료구조에서는 새로운 노드를 삽입할 때 Tail에 의해 외부 참조가 하나 생기는 것 뿐만 아니라 Tail의 그 전 노드의 Next에 의해서도 외부 참조가 생긴다. 따라서 두 외부 참조를 고려한 참조 카운팅 기법으로 OldTail이 너무 일찍 삭제되는 것을 방지해야한다. 이 두 외부 참조 카운터(Tail에 의해, 혹은 Tail의 그 전 노드에 의해)를 구분하여 카운터 개수를 세는 것을 Node 구조안에 반영한다. 두 개의 외부 참조 카운터가 있고 각각의 카운터는 외부로 포인터가 읽어질 때마다 외부 참조를 증가시킨다. 그리고 각 외부 참조 카운터가 파괴될 때, 그 숫자를 하나 감소시키고 해당 카운터에서 세어진 외부 참조 개수를 내부 참조에 더하는 것으로 반영한다. 이 부분은 더 이상 해당 외부 참조 카운터에 의해 참조되지 않을 것을 내부 참조에 반영하는 것으로 LockFree 스택과 같다.

 

만약 해당 노드의 외부 참조 카운터가 모두 파괴되어 0을 가리키고 내부 참조 또한 0이라면 해당 노드를 삭제해도 안전하다.

 

Node 구조

	struct NodeCounter
	{
		unsigned InternalCount : 30;   // use 30 bits
		unsigned ExternalCounters : 2; // use 2 bits (최대 2개를 가지므로 2비트로 표현 가능)
	};

	struct Node
	{
		atomic<T*> Data;
		atomic<NodeCounter> Count; // Internal Count + External Counters
		CountedNodePtr Next;

		Node()
		{
			NodeCounter NewCount;
			NewCount.InternalCount = 0;
			NewCount.ExternalCounters = 2; // Tail Node와 Tail의 Previous Node의 Next
			Count.store(NewCount);

			Next.Ptr = nullptr;
			Next.ExternalCount = 0;
		}
	};

각각의 Node는 NodeCounter라는 구조체를 가진다. 해당 구조체는 InternalCount와 ExternalCounters를 가지는데, 총 4바이트 크기에 InteralCount가 30비트를 사용하고 ExternalCounters가 2 비트를 사용한다. ExternalCounters는 노드를 생성할 때 2로 초기화되며 이보다 커질 일이 없기 때문이다. Lock Free 스택과 마찬가지로 Node의 포인터는 CountedNodePtr로 외부에 노출되며 이는 ExternalCounter중 하나에 해당한다.

 

다음은 Push()를 위 Node 구조로 설계한 것이다.

	// 외부로 노출되는 포인터
	struct CountedNodePtr
	{
		int ExternalCount;
		Node* Ptr;
	};

	atomic<CountedNodePtr> Head;
	atomic<CountedNodePtr> Tail;
public:
	void Push(const T& NewValue)
	{
		unique_ptr<T> NewData(new T(NewValue));
		CountedNodePtr NewNext;
		NewNext.Ptr = new Node;
		NewNext.ExternalCount = 1;

		CountedNodePtr OldTail = Tail.load();
		for (;;)
		{
			// OldTail에 의해 외부 참조가 하나 증가하였다.
			IncreaseExternalCount(Tail, OldTail);

			// 데이터 삽입 시도
			T* OldData = nullptr;
			if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
			{
				OldTail.Ptr->Next = NewNext;
				OldTail = Tail.exchange(NewNext);

				// 외부 참조 카운터의 수를 하나 줄인다
				FreeExternalCounter(OldTail);
				NewData.release();
				break;
			}
            
			// 외부 참조 읽기 종료 : 내부 참조 감소
			OldTail.Ptr->ReleaseRef();
		}
	}

기본 논리는 다음과 같다.

1) Tail을 읽어드리고 OldTail에 할당한다. OldTail에 의해 Tail은 외부 참조가 하나 증가하였다. IncreaseExternalCount는 Tail이 OldTail과 같다면 Tail을 OldTail에서 외부 참조가 하나 증가한 CountedNodePtr로 바꿔진다. 이는 LockFree 스택에서와 같다.

2) 데이터 삽입을 CAS로 시도한다.

3) 성공하였다면 OldTail은 이제 더 이상 Tail이 아니기 때문에 외부 참조 카운터 하나가 없어진다.

4) 실패하였다면 외부 참조 읽기가 끝났다는 것을 의미한다. 내부 참조를 하나 줄인다.

 

Node의 ReleaseRef()

락 프리 스택에서 내부 참조를 하나 줄이는 것과 동일한 연산이다. 하지만 삭제가 가능한지 확인하려면 큐의 노드 구조에서는 ExternalCounters 외부 참조 카운터가 0인지 까지 확인하는 작업이 필요하다.

    struct Node
	{
		void ReleaseRef()
		{
			NodeCounter OldCounter = Count.load(std::memory_order_relaxed);
			NodeCounter NewCounter;
			// Count -> NewCounter(내부 참조가 하나 감소한)
			do
			{
				NewCounter = OldCounter;
				--NewCounter.InternalCount;
			} while (!Count.compare_exchange_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));
			
			// 내부 참조 0, 외부 카운터 0이면 삭제
			if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
			{
				delete this;
			}
		}
	};

- 락 프리 스택에서 Pop()의 CAS가 실패했을 때

내부 참조를 줄이고 내부 참조가 0인지 확인하여 삭제 가능한 지 확인했다.

else if (Ptr->InternalCount.fetch_sub(1) == 1)
{
	delete Ptr;
}

 

- 외부 참조 증가

IncreaseExternalCount는 스택의 멤버함수 IncreaseHeadCount와 같으며 큐에서는 정적으로 선언하여 임의의 노드에 대해 외부 참조를 증가시킨다.

	static void IncreaseExternalCount(std::atomic<CountedNodePtr>& Counter, CountedNodePtr& OldCounter)
	{
		CountedNodePtr NewCounter;
		do
		{
			NewCounter = OldCounter;
			++NewCounter.ExternalCount;
		} while (!Counter.compare_exchange_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));
		
		OldCounter.ExternalCount = NewCounter.ExternalCount;
	}

 

- 외부 참조 카운터 해제 및 내부 참조 증가

스택에서 노드가 Pop()이 성공했을 때 더 이상 해당 노드의 외부 참조가 이루어지지 않으므로 모든 동시성 쓰레드 개수 ExternalCount - 2 만큼 내부 참조를 증가시켰다.(미래에 다른 쓰레드의 내부 참조가 완료될 것이기 때문에)

	static void FreeExternalCounter(CountedNodePtr& OldNodePtr)
	{
		Node* const Ptr = OldNodePtr.Ptr;
		const int CountIncrease = OldNodePtr.ExternalCount - 2;
		NodeCounter OldCounter = Ptr->Count.load(std::memory_order_relaxed);

		NodeCounter NewCounter;
		do
		{
			NewCounter = OldCounter;
			--NewCounter.ExternalCounters; // 외부 참조 카운터 삭제
			NewCounter.InternalCount += CountIncrease; // 해당 내부 참조에 순 외부 참조 증가 수 만큼 더해준다.
		} while (!Ptr->Count.compare_exchage_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));

		// 삭제 조건
		if (NewCounter.InteralCount == 0 && NewCounter.ExternalCounters == 0)
		{
			delete Ptr;
		}
	}

- Push()의 맹점

Push()는 이제 경쟁 조건이 발생하지 않는다. 하지만 코드를 보면 Lock-Free를 위해 설계했지만 살아있는 락이 존재할 수 있는 구간이 생긴다.

		CountedNodePtr OldTail = Tail.load();
		for (;;)
		{
			IncreaseExternalCount(Tail, OldTail);

			T* OldData = nullptr;
			if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
			{
				// 이 구간에서 Tail이 바뀔 때까지 다른 쓰레드는 Push()를 할 수 없다.
				OldTail.Ptr->Next = NewNext;
				//
				OldTail = Tail.exchange(NewNext);

				FreeExternalCounter(OldTail);
				NewData.release();
				break;
			}

			OldTail.Ptr->ReleaseRef();
		}

OldTail.Ptr->Data의 CAS 성공 이후와 Tail의 변경까지의 갭이 락이 살아있는 공간이다. 다른 쓰레드가 Push()를 하려고 할때 CAS에서 해당 Tail에 데이터가 이미 있는 것을 보고 루프를 다시 돌 것이다. 만약 CAS를 성공시킨 Pop() 쓰레드가 이 구간에서 블락이 된다면 다른 쓰레드는 해당 쓰레드가 Tail을 바꿀 때까지 바쁜 대기를 하게 될 것이다. Pop()은 Lock-Free라고 할 수 없다.

 

다른 쓰레드가 도와주는 Push()

Tail을 업데이트하기 위해서는 현재 Tail노드의 다음이 새로운 노드(OldTail.Ptr->Next = NewNext)가 먼저 실행되어야 한다. 이 일을 다른 쓰레드가 해주는 것이다.

Help From Thread B

문제가 되는 상황은 Thread A가 CAS를 통해 OldTail에 데이터 삽입을 성공했지만 다음 포인터를 Thread A에서 생성한 새로운 노드로 설정하지 않았다. 그 이후 Thread A는 스케줄러에 의해 블락된다. Thread B는 Thread A가 다음 포인터를 바꾸고 Tail은 업데이트하기 전까지 더 이상 진행할 수 없다. (락)

 

그렇다면, Thread B에서 할 수 있는 일은 이 상황을 알아차려 데이터 삽입은 됬지만 OldTail의 다음 포인터가 Null로 설정이 되었을 때 (Thread A가 데이터 삽입만 한 상태)를 CAS로 알아차려 Thread B가 생성한 노드를 다음 포인터로 설정하고 Tail로 설정할 수 있을 것이다.

 

하나씩 바꾸어 보자면, OldTail->Next(CountedNodePtr)은 이제 CAS를 사용할 수 있어야하므로 atomic으로 변경해야한다. Pop()에서 일부 라인을 수정한다.

	struct Node
	{
		atomic<T*> Data;
		atomic<NodeCounter> Count; // Internal Count + External Counters
		atomic<CountedNodePtr> Next; // 이제 Next는 atomic이다.
	};
	std::unique_ptr<T> Pop()
	{
		CountedNodePtr OldHead = Head.load(std::memory_order_relaxed);

		for (;;)
		{
			IncreaseExternalCount(Head, OldHead);

			Node* const Ptr = OldHead.Ptr;

			// 큐가 비어있음
			if (Ptr == Tail.load().Ptr)
			{
				Ptr->ReleaseRef();
				return std::unique_ptr<T>();
			}

			// Ptr->Next를 설정하기전, 먼저 load를 해준다.
			CountedNodePtr Next = Ptr->Next.load();
			if (Head.compare_exchange_strong(OldHead, Next))
			{
				T* const Res = Ptr->Data.exchange(nullptr);
				FreeExternalCounter(OldHead);
				return std::unique_ptr<T>(Res);
			}

			Ptr->ReleaseRef();
		}
	}

CountedNodePtr Next = Ptr->Next.load(); 부분이 수정되었다.

 

다른 쓰레드에서 도움을 받는 Push() 설계

먼저 그림에서 가정한 상황 Thread A가 CAS로 데이터 삽입을 완료하고 Next 포인터를 바꾸고 Tail을 업데이트하는 로직을 보자.

		for (;;)
		{
			IncreaseExternalCount(Tail, OldTail);

			// (6) 데이터 삽입을 시도한다.
			T* OldData = nullptr;
			if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
			{
				// (7) OldTail->Next를 자신이 생성한 노드 NewNext로 바꾸는 것을 시도한다.
				CountedNodePtr OldNext = {}; // 다음은 비어있어야한다.
				if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
				{
					// (8) 다른 쓰레드에 의해 노드가 삽입되었으므로 현재 쓰레드에서 노드를 삽입하지 않는다.
					delete NewNext.Ptr;
					NewNext = OldNext; // 다른 쓰레드에 의해 설정된 OldTail->Next 포인터를 사용
				}

				// (9) Tail을 업데이트한다.
				SetNewTail(OldTail, NewNext);
				NewData.release(); // 데이터 삽입은 성공했다.
				break;
			}

일반적인 Tail에 데이터를 삽입과 다른 쓰레드의 개입 여부를 확인하는 Thread A의 로직이다.

(6) : 데이터 삽입을 성공 하였다.

(7) : OldTail->Next를 자신의 쓰레드에서 생성한 노드 NewNext로 바꾸는 것을 CAS로 시도한다.

    - 만약 성공했다면, 이전의 Push() 설계와 같은 논리로 (9)에서 Tail을 업데이트하는 것으로 끝난다.

    - 만약 실패했다면, 이 시점에서 실패할 이유는 "도움을 받는 Push()" 설계로 다른 쓰레드가 Next에 노드를 할당해서 비어 있는 상태가 아니라는 뜻이다. 그렇다면 자신이 생성한 동적 할당 노드는 필요없으므로 삭제시키고 Tail로 설정할 노드는 CAS의 실패로 OldNext에는 실제 OldTail->Next가 설정됨으로 OldNext가 실제 Tail노드로 업데이트 시켜야할 것이다. (NewNext = OldNext)

(9) : Tail 노드를 업데이트한다. 데이터 삽입은 성공했으니 unique_ptr를 release한다.

			else
			{
				// (10) 지금 OldTail에 데이터는 들어가 있는 상태이다.
				CountedNodePtr OldNext = {};
				// Tail 노드가 업데이트가 안되어 있을 수 있으니 다른 쓰레드에서 도와주자
				if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
				{
					// (11) 다른 쓰레드에서 OldTail->Next를 바꾸었다.
					OldNext = NewNext;
					NewNext.Ptr = new Node; // 현재 쓰레드가 노드를 삽입한 것은 아니므로 새로운 노드를 할당한다.
				}

				// (12) Tail을 업데이트한다.
				SetNewTail(OldTail, OldNext);
			}
		} // End for
	} // End Push

다른 쓰레드(Thread B)가 Next에 노드 삽입을 도와주는 로직이다.

(10) : CAS의 실패로 OldTail에 데이터가 들어가 있는 상태이다. 그러나 OldTail->Next는 설정되어 있지 않을 수 있다. Next 포인터의 CAS 시도로 다음 노드가 비어있는지 확인한다.

(11) : Thread B에서 OldTail->Next를 바꾸었다. Tail로 설정할 OldNext는 NewNext가 되고 Thread B가 Thread A의 Push()를 도와준 것이지 자신의 Push()를 한 것이 아니기 때문에 노드를 다시 생성한다.

(12) : Tail을 업데이트 한다.

 

다음은 실제 Tail을 업데이트 하는 함수이다.

// (1) 실제 Tail노드가 업데이트 되는 함수
	void SetNewTail(CountedNodePtr& OldTail, CountedNodePtr const& NewTail)
	{
		Node* const CurrentTailPtr = OldTail.Ptr;

		// (2) 다른 쓰레드에 의해 Tail이 바뀌지 않았을 때만 loop를 돈다.
		while (!Tail.compare_exchange_weak(OldTail, NewTail)
			&& OldTail.Ptr == CurrentTailPtr);

		// (3) 현재 쓰레드가 Tail을 업데이트했을 때
		if (OldTail.Ptr == CurrentTailPtr)
		{
			// (4) 해당 OldTail의 외부 참조 카운터를 제거한다.
			FreeExternalCounter(OldTail);
		}
		else
		{
			// (5) 다른 쓰레드가 Tail을 바꾸었으므로 현재 OldTail은 읽기만 종료한 것이다.
			CurrentTailPtr->ReleaseRef();
		}
	}

(2) : 다른 쓰레드에 의해 Tail이 바뀌었다면 OldTail은 CAS 결과로 실제 Tail 값으로 바뀐다. 그렇다면 OldTail이 바뀌어 CurrentTailPtr != OldTail.Ptr인 게 되고 while 루프를 돌지 않게 된다. Tail의 CAS는 또 다른 Push()로 외부 참조 횟수가 변한 것을 반영하려고 하는 것이고 OldTail.Ptr == CurrentTailPtr을 비교하는 이유는 Tail 노드를 설정하는 쓰레드는 A와 B 둘 중 하나만 Tail을 업데이트하게 하는 것을 보장하려고 한다.

 

(3) : 이 다음은 Thread A와 B 둘 중 실제 Tail로 설정한 쓰레드는 Tail의 외부 참조 카운터를 제거(4)할 것이고 그렇지 않은 쓰레드는 외부 참조 읽기만 종료한 것이므로 내부 참조를 감소하는 ReleaseRef()를 호출한다.

 

LockFree Queue에 사용된 전체 코드

더보기
#pragma once
#include "Define.h"
// Single Producer single consumer
template<typename T>
class LockFreeQueueSPSC
{
private:
	struct Node
	{
		std::shared_ptr<T> Data;
		Node* Next;
		Node() : Next(nullptr)
		{}
	};

	std::atomic<Node*> Head;
	std::atomic<Node*> Tail;

	Node* PopHead()
	{
		Node* const OldHead = Head.load();
		if (OldHead == Tail.load())
		{
			return nullptr;
		}
		Head.store(OldHead->Next);
		return OldHead;
	}

public:
	LockFreeQueueSPSC() : Head(new Node), Tail(Head.load()) {}
	~LockFreeQueueSPSC()
	{
		while (Node* const OldHead = Head.load())
		{
			Head = OldHead->Next;
			delete OldHead;
		}
	}
	std::shared_ptr<T> Pop()
	{
		Node* OldHead = PopHead();
		if (!OldHead)
		{
			return std::shared_ptr<T>();
		}
		std::shared_ptr<T> const Res(OldHead->Data);
		delete OldHead;
		return Res;
	}
	void Push(const T& NewValue)
	{
		std::shared_ptr<T> NewData(std::make_shared<T>(NewValue));
		Node* p = new Node;
		Node* const OldTail = Tail.load();
		OldTail->Data.swap(NewData);
		OldTail->Next = p;
		Tail.store(p);
	}
};

template<typename T>
class LockFreeQueue
{
private:
	struct Node;

	// 외부로 노출되는 포인터
	struct CountedNodePtr
	{
		int ExternalCount;
		Node* Ptr;
	};
	
	atomic<CountedNodePtr> Head;
	atomic<CountedNodePtr> Tail;

	struct NodeCounter
	{
		unsigned InternalCount : 30;   // use 30 bits
		unsigned ExternalCounters : 2; // use 2 bits (최대 2개를 가지므로 2비트로 표현 가능)
	};

	struct Node
	{
		atomic<T*> Data;
		atomic<NodeCounter> Count; // Internal Count + External Counters
		atomic<CountedNodePtr> Next;

		Node()
			:
			Next(CountedNodePtr({}))
		{
			NodeCounter NewCount;
			NewCount.InternalCount = 0;
			NewCount.ExternalCounters = 2; // Tail Node와 Tail의 Previous Node의 Next
			Count.store(NewCount);
		}

		void ReleaseRef()
		{
			NodeCounter OldCounter = Count.load(std::memory_order_relaxed);
			NodeCounter NewCounter;
			// Count -> NewCounter(내부 참조가 하나 감소한)
			do
			{
				NewCounter = OldCounter;
				--NewCounter.InternalCount;
			} while (!Count.compare_exchange_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));

			// 내부 참조 0, 외부 카운터 0이면 삭제
			if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
			{
				delete this;
			}
		}
	};

public:
	LockFreeQueue()
	{
		CountedNodePtr NewNode;
		NewNode.Ptr = new Node();
		NewNode.ExternalCount = 0;
		Head.store(NewNode);
		Tail = Head.load();
	}
	~LockFreeQueue()
	{
		CountedNodePtr Cur = Head.load();
		while (Cur.Ptr)
		{
			Node* Temp = Cur.Ptr;
			Cur = Cur.Ptr->Next;
			delete Temp;
		}
	}
	size_t Size() const
	{
		size_t Size = 0;
		CountedNodePtr Cur = Head.load();
		while (Cur.Ptr)
		{
			++Size;
			Cur = Cur.Ptr->Next;
		}
		assert(Size > 0);
		return Size - 1;
	}
private:
	static void IncreaseExternalCount(std::atomic<CountedNodePtr>& Counter, CountedNodePtr& OldCounter)
	{
		CountedNodePtr NewCounter;
		do
		{
			NewCounter = OldCounter;
			++NewCounter.ExternalCount;
		} while (!Counter.compare_exchange_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));

		OldCounter.ExternalCount = NewCounter.ExternalCount;
	}

	static void FreeExternalCounter(CountedNodePtr& OldNodePtr)
	{
		Node* const Ptr = OldNodePtr.Ptr;
		const int CountIncrease = OldNodePtr.ExternalCount - 2;
		NodeCounter OldCounter = Ptr->Count.load(std::memory_order_relaxed);

		NodeCounter NewCounter;
		do
		{
			NewCounter = OldCounter;
			--NewCounter.ExternalCounters; // 외부 참조 카운터 삭제
			NewCounter.InternalCount += CountIncrease; // 해당 내부 참조에 순 외부 참조 증가 수 만큼 더해준다.
		} while (!Ptr->Count.compare_exchange_strong(OldCounter, NewCounter, std::memory_order_acquire, std::memory_order_relaxed));

		// 삭제 조건
		if (NewCounter.InternalCount == 0 && NewCounter.ExternalCounters == 0)
		{
			delete Ptr;
		}
	}

public:
	std::unique_ptr<T> Pop()
	{
		CountedNodePtr OldHead = Head.load(std::memory_order_relaxed);

		for (;;)
		{
			IncreaseExternalCount(Head, OldHead);

			Node* const Ptr = OldHead.Ptr;

			// 큐가 비어있음
			if (Ptr == Tail.load().Ptr)
			{
				Ptr->ReleaseRef();
				return std::unique_ptr<T>();
			}

			CountedNodePtr Next = Ptr->Next.load();
			if (Head.compare_exchange_strong(OldHead, Next))
			{
				T* const Res = Ptr->Data.exchange(nullptr);
				FreeExternalCounter(OldHead);
				return std::unique_ptr<T>(Res);
			}

			Ptr->ReleaseRef();
		}
	}

private:
	// (1) 실제 Tail노드가 업데이트 되는 함수
	void SetNewTail(CountedNodePtr& OldTail, CountedNodePtr const& NewTail)
	{
		Node* const CurrentTailPtr = OldTail.Ptr;

		// (2) 다른 쓰레드에 의해 Tail이 바뀌지 않았을 때만 loop를 돈다.
		while (!Tail.compare_exchange_weak(OldTail, NewTail)
			&& OldTail.Ptr == CurrentTailPtr);

		// (3) 현재 쓰레드가 Tail을 업데이트했을 때
		if (OldTail.Ptr == CurrentTailPtr)
		{
			// (4) 해당 OldTail의 외부 참조 카운터를 제거한다.
			FreeExternalCounter(OldTail);
		}
		else
		{
			// (5) 다른 쓰레드가 Tail을 바꾸었으므로 현재 OldTail은 읽기만 종료한 것이다.
			CurrentTailPtr->ReleaseRef();
		}
	}

public:
	void Push(const T& NewValue)
	{
		std::unique_ptr<T> NewData(new T(NewValue));
		CountedNodePtr NewNext;
		NewNext.Ptr = new Node;
		NewNext.ExternalCount = 1;
		CountedNodePtr OldTail = Tail.load();

		for (;;)
		{
			IncreaseExternalCount(Tail, OldTail);

			// (6) 데이터 삽입을 시도한다.
			T* OldData = nullptr;
			if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
			{
				// (7) OldTail->Next를 자신이 생성한 노드 NewNext로 바꾸는 것을 시도한다.
				CountedNodePtr OldNext = {}; // 다음은 비어있어야한다.
				if (!OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
				{
					// (8) 다른 쓰레드에 의해 노드가 삽입되었으므로 현재 쓰레드에서 노드를 삽입하지 않는다.
					delete NewNext.Ptr;
					NewNext = OldNext; // 다른 쓰레드에 의해 설정된 OldTail->Next 포인터를 사용
				}

				// (9) Tail을 업데이트한다.
				SetNewTail(OldTail, NewNext);
				NewData.release(); // 데이터 삽입은 성공했다.
				break;
			}
			else
			{
				// (10) 지금 OldTail에 데이터는 들어가 있는 상태이다.
				CountedNodePtr OldNext = {};

				// Tail 노드가 업데이트가 안되어 있을 수 있으니 다른 쓰레드에서 도와주자
				if (OldTail.Ptr->Next.compare_exchange_strong(OldNext, NewNext))
				{
					// (11) 다른 쓰레드에서 OldTail->Next를 바꾸었다.
					OldNext = NewNext;
					NewNext.Ptr = new Node; // 현재 쓰레드가 노드를 삽입한 것은 아니므로 새로운 노드를 할당한다.
				}

				// (12) Tail을 업데이트한다.
				SetNewTail(OldTail, OldNext);
			}
		} // End for
	} // End Push

public:
	void PushNotLockFree(const T& NewValue)
	{
		unique_ptr<T> NewData(new T(NewValue));
		CountedNodePtr NewNext;
		NewNext.Ptr = new Node;
		NewNext.ExternalCount = 1;

		CountedNodePtr OldTail = Tail.load();
		for (;;)
		{
			// OldTail에 의해 외부 참조가 하나 증가하였다.
			IncreaseExternalCount(Tail, OldTail);

			// 데이터 삽입 성공
			T* OldData = nullptr;
			if (OldTail.Ptr->Data.compare_exchange_strong(OldData, NewData.get()))
			{
				OldTail.Ptr->Next = NewNext;
				OldTail = Tail.exchange(NewNext);

				// 외부 참조 카운터의 수를 하나 줄인다
				FreeExternalCounter(OldTail);
				NewData.release();
				break;
			}

			// 외부 참조 읽기 종료 : 내부 참조 감소
			OldTail.Ptr->ReleaseRef();
		}
	}
public:
	void Push_Broken(const T& NewValue)
	{
		// Data를 compare_exchange_strong()으로 변경해야함
		// unique_ptr로 만들고 성공했을 때 atomic<T*> Data로 관리
		unique_ptr<T> NewData(new T(NewValue));
		CountedNodePtr NewNext;
		NewNext.Ptr = new Node;
		NewNext.ExternalCount = 1; // Tail에 의해

		for (;;)
		{
			// 1. Tail 포인터 로드
			Node* const OldTail = Tail.load();

			// 2. OldTail 역참조하여 Data가 nullptr인지 CAS
			// 3. 다른 쓰레드에서 Pop() 인해 2의 OldTail이 댕글링 포인터가 된다.
			/*  큐에 데이터가 하나 존재 (Node1(Data), Node2(null))
				스레드 1 : OldTail은 Node2
				스레드 2 : Push 또다른 노드 (Node1(Data), Node2(Data2), Node3(null))
				스레드 3 : Pop(), Pop()   (Head == Tail == Node3(null))
				스레드 1 : OldTail은 삭제된 포인터(Node2)를 가리키고 있음
			*/
			T* OldData = nullptr;
			if (OldTail->Data.compare_exchange_string(OldData, NewData.get())) // 데이터 삽입
			{
				// 큐 데이터 구조에서
				// 이전 노드의 Next로 인해 NewNext의 외부 참조가 하나 더 생긴다.
				OldTail->Next = NewNext;

				Tail.store(NewNext.Ptr);
				NewData.release(); // 메모리 릴리즈
				break;
			}
		}
	}
};

 

쓰레드 64개를 생성해서 해당 락프리 큐를 사용한 결과, 동기화가 유효하다.

 

참고 : C++ Concurrent In Action : Anthony Williams