다음을 참고하여 작성하였습니다.
모두의 코드, 쓰레드 풀: modoocode.com/285
쓰레드 풀이란
특정 함수를 실행하는 워커(Worker) 쓰레드를 관리하는 객체이다. 다음과 같은 역할을 수행한다.
- 요청받은 작업을 작업 큐에 삽입한다.
- 대기 중인 워커 쓰레드 중 하나를 wakeup하여 작업을 큐에서 꺼내고 실행한다.
- 다수의 쓰레드에서 접근하는 작업 큐는 뮤텍스로 보호한다.
쓰레드 풀 생성
워커 쓰레드를 생성하는 것으로 시작한다.
ThreadPool::ThreadPool(size_t num_threads)
:
num_threads(num_threads),
stop_all(false)
{
worker_threads.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
{
// 워커 쓰레드 추가
worker_threads.emplace_back([this]() { this->WorkerThread(); });
}
}
작업을 추가하는 함수
void ThreadPool::EnqueueJob(std::function<void()> job)
{
if (stop_all)
{
throw std::runtime_error("ThreadPool 사용 중지 됌");
}
{
std::lock_guard<std::mutex> lock(mtx);
// 작업 큐 보호하고 작업을 추가한다.
jobQue.push(std::move(job));
}
cv.notify_one();
}
쓰레드 풀의 종료 상황 - 소멸 시 혹은 임의로 종료하였을 때
1. 쓰레드 풀이 종료된 상태라면, 더 이상 작업 요청을 받지 않는다.
2. 대기 중인 모든 쓰레드를 깨우고 작업 큐가 비어있다면 워커 쓰레드를 종료시킨다.
3. 작업 중 인 워커 쓰레드가 있다면 합류 불가능하게 만들어 (join/detach) 계속 처리하도록 한다.
ThreadPool::~ThreadPool()
{
stop_all = true;
// 자고 있는 모든 쓰레드를 깨운다.
cv.notify_all();
// 합류 불가능하게 만든다.
for (auto& t : worker_threads)
{
t.join();
}
}
워커 쓰레드의 일
block 상태에 있다가 작업 요청에 의해 깨어나고 작업 큐에서 작업을 하나 꺼내 수행한다.
void ThreadPool::WorkerThread()
{
while (true)
{
std::unique_lock<std::mutex> lock(mtx);
// 비어있지 않거나 stop_all일 경우 다음 실행을 진행한다.
cv.wait(lock, [this]() { return !this->jobQue.empty() || stop_all; });
// 쓰레드 종료와 모든 작업이 끝낫을 경우 return
if (stop_all && this->jobQue.empty())
{
return;
}
// 맨 앞의 작업을 뺀다.
auto nextJob = std::move(jobQue.front());
jobQue.pop();
lock.unlock();
// 해당 job을 수행한다.
nextJob();
}
}
쓰레드 풀 클래스 전체 코드
#pragma once
#include <functional>
#include <thread>
#include <queue>
#include <vector>
#include <condition_variable>
#include <mutex>
class ThreadPool
{
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 추가
void EnqueueJob(std::function<void()> job);
private:
// 총 Worker 쓰레드의 개수
size_t num_threads;
// Worker 쓰레드를 보관하는 벡터
std::vector<std::thread> worker_threads;
// 할 일들을 보관하는 job 큐
std::queue<std::function<void()>> jobQue;
// 위의 job 큐들을 위한 조건 변수와 뮤텍스
std::condition_variable cv;
std::mutex mtx;
// 모든 쓰레드 종료
bool stop_all;
// 작업 큐에 있는 작업 하나를 실행한다.
void WorkerThread();
};
워커 쓰레드가 실행한 함수 반환값을 받는 쓰레드 풀 구현하기
작업을 추가하는 새로운 함수는 함수와 함수인자를 받는 시그니쳐를 갖는다.
// C ++14
template<class Func, class... Args>
std::future<typename std::result_of<Func(Args...)>::type>
EmplaceJobAndGetFuture(Func&& f, Args&&... args)
추후에 복사없이 호출가능한 객체를 만들기 위해 함수와 인자에 대해 보편 참조를 이용하고 완벽전달을 실행해야한다.
쓰레드를 비동기적으로 실행할 때 함수 반환값을 받는 방법은 쓰레드와 연결된 미래 객체를 생성하여 그 결과나 예외를 받을 수 있게 할 수 있다.
약속 객체 std::promise를 이용할 시 쓰레드에 약속 객체를 전달해야하는 불편함이 있다. std::packaged_task를 이용하면 내부에 약속 객체를 가지고 있으며 실행할 함수 역시 미리 전달하여 내부에 둘 수 있다.
// C++ 14
using return_type = typename std::result_of<Func(Args...)>::type;
// 완벽 전달로 함수와 인수를 전달한다.
auto callable = std::bind(std::forward<Func>(f), std::forward<Args>(args)...);
using job_type = std::packaged_task<return_type()>;
auto job = std::make_shared<job_type>(std::move(callable));
작업을 생성한 뒤 연관된 미래 객체를 반환할 수 있도록 미래 객체를 가져온다.
// 작업과 연관된 미래 객체를 얻어온다.
auto result_future = job->get_future();
그 후는 일반적인 작업을 추가하도록 한다. std::packaged_task를 작업 큐에 오버헤드가 적게 이동시키는 것이 까다롭기 때문에 shared_ptr를 복사하여 람다 형태로 전달해준다.
{
std::lock_guard<std::mutex> lock(mtx);
jobQue.push([job]()
{
(*job)();
});
}
전체 함수
// job을 추가하고 결과를 받을 수 있는 미래 객체를 반환한다.
template<class Func, class... Args>
std::future<typename std::result_of<Func(Args...)>::type>
EmplaceJobAndGetFuture(Func&& f, Args&&... args)
{
// C++ 14
using return_type = typename std::result_of<Func(Args...)>::type;
// 완벽 전달로 함수와 인수를 전달한다.
auto callable = std::bind(std::forward<Func>(f), std::forward<Args>(args)...);
using job_type = std::packaged_task<return_type()>;
auto job = std::make_shared<job_type>(std::move(callable));
// 작업과 연관된 미래 객체를 얻어온다.
auto result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(mtx);
jobQue.push([job]()
{
(*job)();
});
}
cv.notify_one();
return result_future;
}
사용자 코드에서 간편하게 함수와 인자를 전달하여 쓰레드 풀을 사용할 수 있게된다.
#include "ThreadPool.h"
#include <iostream>
using namespace std;
void work(int t, int id)
{
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
}
int calculate(int t, int id)
{
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return id + t;
}
int main()
{
ThreadPool pool(3);
std::vector<std::future<int>> futures;
// 작업을 10개 실행한다.
for (int i = 0; i < 10; i++)
{
//pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
futures.emplace_back(
pool.EmplaceJobAndGetFuture(calculate, i % 3 + 1, i));
}
for (auto& f : futures)
{
printf("result : %d\n", f.get());
}
return 0;
}
'Computer Science 기본 지식 > 운영체제' 카테고리의 다른 글
[C++ Thread] Windows API에서 쓰레드 생성 (0) | 2021.06.01 |
---|---|
[C++ Thread] 스핀 락 Spin Lock (0) | 2021.05.05 |
[C++ Thread] 약속과 미래 객체, std::promise / std::future (0) | 2021.04.30 |
[C++ Thread] memory order 와 atomic 객체 (0) | 2021.04.27 |
[C++ Thread] 생산자 소비자 문제 (0) | 2021.04.24 |