Computer Science 기본 지식/운영체제

[C++ Thread] 쓰레드 풀 Thread Pool

로파이 2021. 4. 30. 19:57

다음을 참고하여 작성하였습니다.

모두의 코드, 쓰레드 풀: modoocode.com/285

 

쓰레드 풀이란

특정 함수를 실행하는 워커(Worker) 쓰레드를 관리하는 객체이다. 다음과 같은 역할을 수행한다.

- 요청받은 작업을 작업 큐에 삽입한다.

- 대기 중인 워커 쓰레드 중 하나를 wakeup하여 작업을 큐에서 꺼내고 실행한다.

- 다수의 쓰레드에서 접근하는 작업 큐는 뮤텍스로 보호한다.

 

쓰레드 풀 생성

워커 쓰레드를 생성하는 것으로 시작한다.

ThreadPool::ThreadPool(size_t num_threads)
	:
	num_threads(num_threads),
	stop_all(false)
{
	worker_threads.reserve(num_threads);
	for (size_t i = 0; i < num_threads; ++i)
	{
		// 워커 쓰레드 추가
		worker_threads.emplace_back([this]() { this->WorkerThread(); });
	}
}

 

작업을 추가하는 함수

void ThreadPool::EnqueueJob(std::function<void()> job)
{
	if (stop_all)
	{
		throw std::runtime_error("ThreadPool 사용 중지 됌");
	}
	{
		std::lock_guard<std::mutex> lock(mtx);

		// 작업 큐 보호하고 작업을 추가한다.
		jobQue.push(std::move(job));
	}
	cv.notify_one();
}

 

쓰레드 풀의 종료 상황 - 소멸 시 혹은 임의로 종료하였을 때

1. 쓰레드 풀이 종료된 상태라면, 더 이상 작업 요청을 받지 않는다.

2. 대기 중인 모든 쓰레드를 깨우고 작업 큐가 비어있다면 워커 쓰레드를 종료시킨다.

3. 작업 중 인 워커 쓰레드가 있다면 합류 불가능하게 만들어 (join/detach) 계속 처리하도록 한다.

ThreadPool::~ThreadPool()
{
	stop_all = true;
	// 자고 있는 모든 쓰레드를 깨운다.
	cv.notify_all();

	// 합류 불가능하게 만든다.
	for (auto& t : worker_threads)
	{
		t.join();
	}
}

 

워커 쓰레드의 일

block 상태에 있다가 작업 요청에 의해 깨어나고 작업 큐에서 작업을 하나 꺼내 수행한다.

void ThreadPool::WorkerThread()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(mtx);

		// 비어있지 않거나 stop_all일 경우 다음 실행을 진행한다.
		cv.wait(lock, [this]() { return !this->jobQue.empty() || stop_all; });
		
		// 쓰레드 종료와 모든 작업이 끝낫을 경우 return
		if (stop_all && this->jobQue.empty())
		{
			return;
		}

		// 맨 앞의 작업을 뺀다.
		auto nextJob = std::move(jobQue.front());
		jobQue.pop();
		lock.unlock();

		// 해당 job을 수행한다.
		nextJob();
	}
}

 

쓰레드 풀 클래스 전체 코드

#pragma once
#include <functional>
#include <thread>
#include <queue>
#include <vector>
#include <condition_variable>
#include <mutex>

class ThreadPool
{
public:
	ThreadPool(size_t num_threads);
	~ThreadPool();
	
	// job 추가
	void EnqueueJob(std::function<void()> job);

private:
	// 총 Worker 쓰레드의 개수
	size_t num_threads;
	// Worker 쓰레드를 보관하는 벡터
	std::vector<std::thread> worker_threads;
	// 할 일들을 보관하는 job 큐
	std::queue<std::function<void()>> jobQue;
	// 위의 job 큐들을 위한 조건 변수와 뮤텍스
	std::condition_variable cv;
	std::mutex mtx;

	// 모든 쓰레드 종료
	bool stop_all;

	// 작업 큐에 있는 작업 하나를 실행한다.
	void WorkerThread();
};

 

워커 쓰레드가 실행한 함수 반환값을 받는 쓰레드 풀 구현하기

 

작업을 추가하는 새로운 함수는 함수와 함수인자를 받는 시그니쳐를 갖는다.

// C ++14
template<class Func, class... Args>
std::future<typename std::result_of<Func(Args...)>::type>
	EmplaceJobAndGetFuture(Func&& f, Args&&... args)

추후에 복사없이 호출가능한 객체를 만들기 위해 함수와 인자에 대해 보편 참조를 이용하고 완벽전달을 실행해야한다.

 

쓰레드를 비동기적으로 실행할 때 함수 반환값을 받는 방법은 쓰레드와 연결된 미래 객체를 생성하여 그 결과나 예외를 받을 수 있게 할 수 있다.

 

약속 객체 std::promise를 이용할 시 쓰레드에 약속 객체를 전달해야하는 불편함이 있다. std::packaged_task를 이용하면 내부에 약속 객체를 가지고 있으며 실행할 함수 역시 미리 전달하여 내부에 둘 수 있다.

// C++ 14
using return_type = typename std::result_of<Func(Args...)>::type;
// 완벽 전달로 함수와 인수를 전달한다.
auto callable = std::bind(std::forward<Func>(f), std::forward<Args>(args)...);

using job_type = std::packaged_task<return_type()>;
auto job = std::make_shared<job_type>(std::move(callable));

 

작업을 생성한 뒤 연관된 미래 객체를 반환할 수 있도록 미래 객체를 가져온다.

// 작업과 연관된 미래 객체를 얻어온다.
auto result_future = job->get_future();

 

그 후는 일반적인 작업을 추가하도록 한다. std::packaged_task를 작업 큐에 오버헤드가 적게 이동시키는 것이 까다롭기 때문에 shared_ptr를 복사하여 람다 형태로 전달해준다.

{
  std::lock_guard<std::mutex> lock(mtx);

  jobQue.push([job]()
  {
    (*job)();
  });
}

 

전체 함수

	// job을 추가하고 결과를 받을 수 있는 미래 객체를 반환한다.
	template<class Func, class... Args>
	std::future<typename std::result_of<Func(Args...)>::type>
		EmplaceJobAndGetFuture(Func&& f, Args&&... args)
	{
		// C++ 14
		using return_type = typename std::result_of<Func(Args...)>::type;
		// 완벽 전달로 함수와 인수를 전달한다.
		auto callable = std::bind(std::forward<Func>(f), std::forward<Args>(args)...);

		using job_type = std::packaged_task<return_type()>;
		auto job = std::make_shared<job_type>(std::move(callable));
		// 작업과 연관된 미래 객체를 얻어온다.
		auto result_future = job->get_future();
		{
			std::lock_guard<std::mutex> lock(mtx);

			jobQue.push([job]()
			{
				(*job)();
			});
		}
		cv.notify_one();

		return result_future;
	}

 

사용자 코드에서 간편하게 함수와 인자를 전달하여 쓰레드 풀을 사용할 수 있게된다.

#include "ThreadPool.h"
#include <iostream>

using namespace std;

void work(int t, int id)
{
	printf("%d start \n", id);
	std::this_thread::sleep_for(std::chrono::seconds(t));
	printf("%d end after %ds\n", id, t);
}

int calculate(int t, int id)
{
	printf("%d start \n", id);
	std::this_thread::sleep_for(std::chrono::seconds(t));
	printf("%d end after %ds\n", id, t);
	return id + t;
}

int main()
{
	ThreadPool pool(3);

	std::vector<std::future<int>> futures;
	// 작업을 10개 실행한다.
	for (int i = 0; i < 10; i++)
	{
		//pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
		futures.emplace_back(
			pool.EmplaceJobAndGetFuture(calculate, i % 3 + 1, i));
	}

	for (auto& f : futures)
	{
		printf("result : %d\n", f.get());
	}

	return 0;
}