concurrency 네임스페이스에 포함되어 C++20에 도입된 동시성 컨테이너들이 있다.
- concurrent_queue
- concurrent_unordered_map
- concurrent_unordered_set
- concurrent_vector
등등...
C#의 경우 System.Collections.Concurrent에 이미 제공되던 기능들이다.
그 밖에도 공유 자원 수만 큼 동시성을 제한하는 Semaphore(counting_semaphore, binary_semaphore)와 함수 내에서 같이 실행하는 쓰레드 수(동시성 정도)를 제한하는 배리어 계열(latch, barrier)를 제공하게 되었다.
ThreadPool
스레드 풀의 경우 미리 스레드를 큐 형태로 만들어 놓고 작업이 할당될 때 하나씩 꺼내어 쓰는 방법이다.
새로운 스레드에 할당해서 처리하는 작업을 매번 실행할 때 스레드 리소스 생성 비용을 없앤다.
설계
1. std::thread를 사용하여 스레드 객체를 이용한다. std::thread는 생성할 때 실행할 함수를 받아 스레드 주기가 끝날 때까지 해당 함수만 실행하게 된다.
2. 그런 실행 함수는 while문을 통해 스레드 풀의 생명 주기가 끝나는 flag, ExitFlag가 false 일 경우 계속 실행되도록 한다.
3. ThreadPool을 통한 특정 함수를 받아 작업을 할당하는 Queue(std::function<void()>) 함수가 필요하다.
4. 작업을 보관하는 컨테이너는 concurrent_queue를 사용하도록 한다.
5. 작업을 Queue하였을 때 대기중인 스레드 풀의 스레드 중 하나를 꺠워 작업을 실행하게 해야한다. 이 때 사용할 수 있는 것이 counting_semaphore이다.
6. counting_semaphore는 큐잉 된 작업 수만큼 공유 자원 수를 가진다. Queue가 될 때는 Semahpore의 release()로 공유 자원 수를 늘리고 acquire()를 통해 acquire를 시도하는 스레드 중 하나가 깨어나 작업을 실행할 수 있게 한다.
7. 이떄 까지 큐잉한 모든 작업이 끝날 때까지 대기하는 WaitAll 정도의 도움 함수가 있으면 좋겠다.
#pragma once
#include <functional>
#include <thread>
#include <queue>
#include <mutex>
#include <concurrent_queue.h>
#include <semaphore>
template<typename TaskType = std::function<void()>>
class ThreadPool
{
using ConcurrentQueue = concurrency::concurrent_queue<TaskType>;
using Semaphore = std::counting_semaphore<>;
private:
int NumOfThreads;
bool ExitFlag = false;
std::atomic<int> taskCounts;
std::mutex WaitMutex;
std::vector<std::thread> Workers;
std::shared_ptr<ConcurrentQueue> TaskContainer;
std::shared_ptr<Semaphore> TaskCounter;
public:
ThreadPool()
{
taskCounts.store(0);
NumOfThreads = std::thread::hardware_concurrency() * 2;
TaskContainer = std::make_shared<ConcurrentQueue>();
TaskCounter = std::make_shared<Semaphore>(0);
std::atomic<std::shared_ptr<int>> a;
for (size_t i = 0; i < NumOfThreads; ++i)
{
Workers.emplace_back([this]() { this->Execute(); });
}
}
~ThreadPool()
{
ExitFlag = true;
TaskCounter->release(NumOfThreads);
for (auto& worker : Workers)
{
worker.join();
}
}
void WaitAll()
{
while (taskCounts != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Queue(TaskType action)
{
taskCounts.fetch_add(1);
TaskContainer->push(action);
TaskCounter->release();
}
int GetNumOfThreads() const
{
return NumOfThreads;
}
private:
void Execute()
{
while (!ExitFlag)
{
TaskCounter->acquire();
if (ExitFlag)
return;
TaskType task;
if (TaskContainer->try_pop(task) == false)
continue;
task();
taskCounts.fetch_sub(1);
}
}
};
'Advanced C++' 카테고리의 다른 글
[C++] libcurl (1) 다운로드 및 설치 (0) | 2022.04.17 |
---|---|
[C++] 멀티 스레드 응용 프로그램 : 은행 창구 시스템 구현하기 (0) | 2022.03.27 |
[C++] 정규식 표현 std::regex으로 문자열 찾기 (0) | 2022.02.07 |
[C++] 메모리 관리 (4) 정적 메모리 풀 (0) | 2021.10.10 |
[C++] 메모리 관리 (3) 주소 기반 초기화 placement new (0) | 2021.10.06 |