아이템 35. PLINQ가 병렬 알고리즘을 구현하는 방법을 이해하라
AsParallel()로 생성되는 병렬 버전의 순회 타입 ParallelEnumerable<T>와 관련된 기능을 알아본다.
LINQ to Objects로 실행되는 예제를 위주로 알아본다. PLINQ는 LINQ to SQL이나 EntitiyFramework 알고리즘을 병렬화 하는 데 별로 도움이 안 된다. 어차피 쿼리를 병렬 처리하기 위해 데이터 베이스 엔진의 병렬 처리 기능을 사용하기 때문이다.
public static void PLINQ_Ex1()
{
List<int> data = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
// 순차 순회
var nums = data.Where(m => m < 150).Select(n => Factorial(n));
// 병렬 처리 순회
var numsParallel = data.AsParallel().Where(m => m < 150).Select(n => Factorial(n));
}
IEnumerable<T> 타입은 AsParallel()로 ParallelQuery<T>를 만들어 낸다. ParallelQuery<T>는 IEnumerable<T>를 구현하였으며 GetEnumerator() 메서드가 있다.
namespace System.Linq
{
//
// 요약:
// Represents a parallel sequence.
//
// 형식 매개 변수:
// TSource:
// The type of element in the source sequence.
public class ParallelQuery<TSource> : ParallelQuery, IEnumerable<TSource>, IEnumerable
{
//
// 요약:
// Returns an enumerator that iterates through the sequence.
//
// 반환 값:
// An enumerator that iterates through the sequence.
public virtual IEnumerator<TSource> GetEnumerator();
}
}
AsParallel()를 호출하면 이어서 수행하는 연산을 멀티 스레드를 사용하여 여러 코어에서 실행한다. LINQ가 IEnumerable<T>에 대해 확장 메서드를 제공하는 것처럼 PLINQ는 ParallelQuery<T>에 대해 확장 메서드를 제공한다.
병렬 쿼리는 입력 요소를 분할하는 작업부터 시작한다. 분할 작업은 입력 소스와 쿼리의 종류에 따라 다음 네가지 분할 전략을 사용한다.
1. 범위 분할
입력 범위에 따라 분할 하는 것으로 가장 단순한 방법이다. 1000개의 요소가 있다면 4분할을 하여 처음 부터 250개, 그다음 250개, 그 다음 250개, 마지막 250개로 분할한다. 범위 분할은 순서 인덱스가 있고 총 몇개의 요소가 있는지 알수 있을 때만 사용가능하다. 이러한 입력 소스는 List<T>혹은 IList<T>로 한정된다.
2. 덩어리 분할
입력 값을 덩어리로 나누어 처리한다. 덩어리 크기는 요청마다 변할 수 있다. 덩어리 분할 알고리즘은 구현 방식이 계속 바뀌고 있어 자세한 것은 직접 찾아봐야한다.
3. 줄 단위 분할
특정 쿼리 연산에서 적용할 때 효과적인 분할 방식이다. 첫번째 요소를 찾아내는 처리하는 방법을 최적화한다. N개의 요소를 건너뛰고 일련의 M개의 요소를 처리한다.
4. 해시 분할
Join, GroupJoin, GroupBy, Distinct, Except, Union, Intersect를 사용하는 쿼리에 특화된 분할 방법이다. 해시 코드를 생성하는 항목을 같은 태스크가 처리하도록 하여 쿼리를수행하는 태스크 사이의 상호작용을 최소화한다.
분할 알고리즘의 별개로 PLINQ의 태스크를 병렬화 하기 위해 3개의 알고리즘을 사용한다.
1. 파이프라이닝
하나의 스레드가 순회를 담당한다. 순회를 하면서 각 원소에 쿼리 처리를 요청하고 요청이 생길때마다 추가 스레드에서 작업을 수행한다. 파이프라이닝 모드에서는 보통 코어 수 만큼 스레드가 생기며 다음 요소는 쉬고 있는 스레드 혹은 모두 사용중일 경우 이제 막 작업을 끝낸 스레드가 바로 담당한다.
따라서 보통 코어수 + 1개의 스레드가 생성된다. 순회 담당 스레드는 대부분의 시간을 대기하게될 것이므로 새로 만드는 것이 좋다.
2. 스톱앤고 방식
순회를 시작한 스레드가 결과를 받아 전달하여야 하기 때문에 모든 처리가 끝날때 까지 작업을 조인한다.
병렬 쿼리의 결과를 ToList() 혹은 ToArray()로 받거나 OrderBy()와 Sort()와 같이 중간 결과를 사용해야할 때 적용된다.
var numsParallelImmediate = data.AsParallel().Where(m => m < 150).Select(n => Factorial(n)).ToList();
var numsParallelOrdered = data.AsParallel().Where(m => m < 150).Select(n => Factorial(n)).OrderBy(n => n);
3. 역열거형 방식
결과를 생성하지 않고 모든 쿼리식의 결과에 다른 액션을 취하는 경우 사용
LINQ to Object 비병렬 쿼리는 지연 평가된다. 각 값은 순회될 때 평가되는데, 쿼리의 결과를 처리할 때에 병렬 실행을 적용하는 경우 역열겨헝 방식을 사용한다.
병렬로 처리되기 때문에 순회 요소에대한 처리 순서는 거의 보장되지 않는다.
public static void PLINQ_Ex2()
{
var nums = data.AsParallel().Where(m => m <= 100).Select(n => new { N = n, Result = Factorial(n) });
nums.ForAll(item => Console.WriteLine($"item {item.N} : {item.Result}"));
}
LINQ 쿼리는 기본적으로 지연 실행 방식을 선택한다. 쿼리를 미리 만들어 두고 순회와 같은 방식으로 결과를 요청할 때 실행된다. PLINQ 쿼리는 LINQ to SQL이나 Entity Framework와 비슷하여 첫번째 항목을 요청하면 전체 시퀀스에 대한 결과가 생성된다.
PLINQ는 쿼리식을 어떻게 병렬 처리할 것인지 최적의 방법을 찾아준다. 그렇지만 때때로 병렬 처리가 항상 효과적이다고는 할 수 없다. 병렬 처리는 항상 해당 시퀀스의 전체 결과를 만드려고 하기 때문에 지연 실행 방식이 더 나을 수 도 있다. 멀티 스레드를 사용할 수 있다면 대체적으로 효율적으로 동작할 수 있으나 암달의 법칙등으로 순차적으로 처리하는 부분이 얼마나 되냐에 따라서 달라질 수 있다.
OrderBy(), ThenBy()는 병렬 태스크 간 조율이 필요하고 Skip, SkipWhile, Take, TakeWhile은 병렬화 정도에 영향을 미친다. AsOrdered()와 AsUnordered() 메서드를 사용하면 PLINQ가 결과 시퀀스에서 순서를 지키거나 무시하도록 할 수 있다. 병렬 처리가 바람직하지 않다고 생각되는 경우 중간에 AsSequential()을 강제하여 병렬 시퀀스를 순차적으로 처리하도록 강제할 수 있다.
ParallelQuery<T>가 제공하는 추가적인 병렬 처리 옵션은 다음과 같다.
- WithExecutionMode() : 되도록 병렬 실행되도록 유도하지만 오버헤드가 큰 알고리즘이 선택될 수 있다. 기본적으로는 병렬화했을 때 수행 성능이 개선될 것으로 기대되는 부분만 병렬처리한다.
- WithDegreeOfParallelism() : 알고리즘에 사용되는 병렬 멀티 스레드 개수를 설정한다.
- WithMergeOptions() : PLINQ가 쿼리 중에 버퍼링하는 방식을 선택한다. 기본적으로 PLINQ는 각 스레드가 수행한 결과를 버퍼링하며, 다른 스레드가 이 결과를 활용한다.
아이템 36. 예외를 염두에 두고 병렬 알고리즘을 만들라
병렬 처리는 자식 스레드를 만드는데, 자식 스레드가 잘 못되는 경우는 잘 생각하지 않는다. 실제로 작업이 예외를 던질 수 있으며 그 처리는 개발자의 몫이다.
백그라운드 스레드에서 실행되는 작업이 예외를 발생시키는 경우 상황이 더 복잡해진다. 그 예외가 다른 스레드로 전달 될 수 없기 때문에 콜 스택이 이어질 수 없다. 어떤 예외가 발생했는 지 알 수 없고 추척하기도 어렵다. 병렬 처리의 경우 예외가 발생했을 경우 롤 백을 해야되는 경우가 생길 수도 있다.
Task를 반환하는 비동기 메서드를 실행하는 경우
병렬 작업을 Task.Run() 과 같은 방법으로 수행하면 해당 작업은 이제 백그라운드에서 진행된다. 해당 Task를 Wait() 혹은 Result 속성을 접근하여 작업의 결과를 요청하게 되는 경우 모든 작업에서 발생하는 예외가 담긴 AggregateException가 예외로 던져질 수 있다. AggregateException은 InnerExceptions 컬렉션을 순회하여 모든 예외 정보를 알아내야한다.
public static void ParallelException_Ex1()
{
try
{
Task task = Task.Run(() => { Thread.Sleep(1000); throw new ArgumentException("ParallelException"); });
task.Wait();
}
catch(AggregateException aggregate)
{
ReportAggregateError(aggregate);
}
}
private static void ReportAggregateError(AggregateException aggregate)
{
foreach (var ex in aggregate.InnerExceptions)
{
if (ex is AggregateException agEx)
{
ReportAggregateError(agEx);
}
else
{
Console.WriteLine($"Exception : {ex.Message}");
}
}
}
포함된 예외들 InnerExceptions 중 처리할 수 있는 예외는 현재 단계에서 처리하고 그렇지 않은 경우 상위 호출에서 예외를 처리할 수 있도록 예외를 다시 던져야할 수 있다. 다음과 같이 딕셔너리 타입에 예외 핸들러를 등록하고 발생한 예외들을 처리도중 핸들링 할 수 없는 예외에 대해 다시 예외를 던지게끔 만들어야한다.
public static void ParallelException_Ex2()
{
try
{
Task task = Task.Run(() => { Thread.Sleep(1000); throw new ArgumentException("ParallelException"); });
task.Wait();
}
catch (AggregateException ex)
{
var handlers = new Dictionary<Type, Action<Exception>>();
handlers.Add(typeof(WebException), ex => Console.WriteLine(ex.Message));
if (!HandleAggregateError(ex, handlers))
{
throw;
}
}
}
private static bool HandleAggregateError(AggregateException aggregate, Dictionary<Type, Action<Exception>> handlers)
{
foreach (var ex in aggregate.InnerExceptions)
{
if (ex is AggregateException agEx)
{
if (!HandleAggregateError(agEx, handlers))
return false;
}
else if (handlers.ContainsKey(ex.GetType()))
{
handlers[ex.GetType()](ex);
}
else
return false;
}
return true;
}
혹은 작업 내부 내용에서 try-catch 구문으로 예외를 처리하여 밖으로 예외가 나가지 않게끔 만드는 것도 방법이다. 밖에서 처리하지 않아도 되는 경우 대부분 예외가 밖으로 나가지 않게 하는 것이 바람직하다.
PLINQ를 사용하는 경우 컬렉션에 대한 모든 결과를 만들어낼 때 백그라운드 스레드를 이용한다. 이는 Task.Run()과 같이 AggregateException을 던지기 때문에 위와 같은 예외 핸들링을 적용해야한다.
또한 PLINQ는 LINQ의 순회를 할 때 평가되는 것과 다르게 개별 요소를 처리하는 단계에서 예외가 빨리 발생할 수 있으므로 쿼리문도 같이 try-catch로 감싸야한다.
public static long Factorial_Throw(int n)
{
if (n < 0)
throw new ArgumentException("Negative Value");
return Factorial(n);
}
public static void PLINQ_Ex3()
{
data.AddRange(Enumerable.Range(-10, 10));
try
{
var nums = data.AsParallel().Where(m => m <= 100).Select(n => new { N = n, Result = Factorial_Throw(n) });
nums.ForAll(item => Console.WriteLine($"item {item.N} : {item.Result}"));
}
catch(AggregateException ex)
{
var handlers = new Dictionary<Type, Action<Exception>>();
handlers.Add(typeof(ArgumentException), ex => Console.WriteLine(ex.Message));
if (!HandleAggregateError(ex, handlers))
{
throw;
}
}
}
PLINQ의 동작 방식은 태스크 병렬 라이브러리 (Task Parallel Library)에 의해 결정되고 TPL은 AggregateException 클래스를 사용해서 병렬 알고리즘에서 발생하는 모든 예외를 저장한다.
아이템 37. 스레드를 생성하지 말고 스레드 풀을 사용하라
태스크 병렬 라이브러리 TPL은 스레드 풀을 이용하여 시스템 환경에 최적화된 적절한 수의 스레드를 미리 생성해 놓고 백그라운드 작업을 할당한다. Task.Run()은 스레드 풀의 스레드를 사용해서 그런 백그라운드 작업을 할당하는 함수이다.
스레드 풀의 활성 태스크 수는 시스템이 관리한다. 스레드 풀이 알아서 부하를 분산하기 때문에 부하 분산 로직을 직접 작성할 필요가 없다.
스레드 풀의 성능이 좋은 이유
1. 스레드 풀은 작업을 수행할 준비가 된 스레드를 재사용한다.
2. 스레드 풀이 활성 스레드의 개수를 자동으로 관리한다.
- QueueUserWorkItem()은 스레드 풀에서 가용한 스레드에 작업을 할당하는 식으로 스레드 리소스를 관리한다.
3. 코어 수가 많아지면 많아질 수록 멀티스레드 프로그램을 만들 가능성이 커진다.
아이템 38. 스레드간 커뮤니케이션에는 BackgroundWorker를 사용하라
System.ComponentModel.BackgroundWorker 클래스는 ThreadPool을 이용하여 백그라운드 작업을 시작할 수 있고 스레드간 커뮤니케이션을 가능하도록 한다.
백그라운드 작업에서 발생하는 예외가 전달되지 않기 때문에 스레드가 어떻게 진행되고 있는지 알 수가 없다. BackgroundWorker 클래스는 QueueUserWorkItem()을 내부적으로 사용하지만 백그라운드와 포그라운드 스레드간 커뮤니케이션을 가능하게 한다.
다음과 같은 스레드간 커뮤니케이션을 할 수 있다.
- Run작업의 완료 이벤트를 알 수 있다. 이벤트 인수의 Error 속성을 통해 예외가 발생했는 지 알 수 있다.
- 작업의 취소를 알 수 있다.
- 중간 작업의 진행도를 보고 받을 수 있다.
참고: 2021.12.15 - [C#/Advanced C#] - [C# Thread] BackgroundWorker 클래스
아이템 39. XAML 환경에서 스레드 간 호출을 이해하라
WPF와 같은 어플리케이션에서 스레드 컨트롤을 이해한다.
- 생략
아이템 40. 동기화에는 lock()을 최우선으로 사용하라
임계영역을 보호하는 lock() 동기화 기법은 해당 참조 타입의 인스턴스의 참조 카운트를 활용하여 상호배제 접근을 가능하게 한다.
lock()은 Monitor.Enter()-Exit() 쌍을 try-finally문으로 만들어 항상 감싼 영역을 벗어났을 때 락을 해제하도록 한다.
lock()의 단점
lock()의 주기는 항상 중괄호 내로 제한되지만 Monitor와 같은 동기화 객체를 직접 사용하는 경우 언제 잠그고 해제할 지 컨트롤할 수 있다.
Monitor.Enter()는 타임아웃을 지원한다.
lock()의 장점
컴파일러가 해당 인수가 참조 타입인지 컴파일 시점에 검사한다. Monitor를 사용해서 값 타입을 사용하는 경우 박싱이 일어나고 박싱으로 만들어진 임시 객체에 대해 잠그게 된다. Exit()에서도 값 타입을 사용할 경우 Enter()와 다른 박싱된 인스턴스를 사용하게 되고 SynchrnoizationLockException 예외를 던지게 된다.
사용자의 실수로 락을 해제를 못 한 경우를 방지할 수 있다.
Monitor.TryEnter()의 경우 타임아웃을 지정하여 교착 상태를 방지할 수 있다. 이를 래핑하여 LockHolder 클래스를 만들고 다음과 같이 사용할 수 있다.
public sealed class LockHolder<T> : IDisposable where T : class
{
private T handle;
private bool holdsLock;
public LockHolder(T handle, int milliSecondTimeout)
{
if (Object.ReferenceEquals(handle, null))
throw new ArgumentNullException("LockHolder takes null object");
this.handle = handle;
this.holdsLock = Monitor.TryEnter(handle, milliSecondTimeout);
}
public bool LockSuccessful
{
get
{
return holdsLock;
}
}
public void Dispose()
{
if (holdsLock)
Monitor.Exit(handle);
holdsLock = false;
}
}
object sync = new object();
using (LockHolder<object> lk = new LockHolder<object>(sync, 100))
{
if (lk.LockSuccessful)
{
// CriticalSection
}
}
lock이외에도 다양한 동기화 기법을 사용하도록 권장한다.
Interlocked을 사용하여 단일 정수 변수에 대한 원자적 연산을 이용하거나 Monitor의 Pulse()/Wait()과 같은 방법으로 생산자-소비자 형태의 시스템에 적용할 수 있다.
ReadWriterLockSlim은 읽기보다 수정하는 빈도가 적을 경우 효율적인 동기화 기법을 제공한다.
아이템 41. 락은 가능한 좁은 범위에 적용하라
굳이 동기화를 제공하지 않아도 되는 영역에도 락을 들고 있을 이유가 없다.
아이템 42. 잠긴 영역에서는 외부 코드 호출을 삼가라
보호 받는 영역에서 외부 호출을 사용하게된다면 교착 상태에 빠질 위험이 있다.
ex)
스레드 A에서 Func1()를 호출한다. Func1()은 lock을 걸고 WPF 어플리케이션의 Control.Invoke() 메서드를 호출한다. 해당 메서드가 끝날때 까지 현재 스레드를 블락시키고 Control.Invoke()에의해 수행되는 Func2()는 UI 스레드에서 수행된다. Func2()는 어쩌다 Func1()이 잠근 lock을 접근하는 메서드를 호출한다. 해당 lock을 얻을 수 없으므로 두 스레드는 교착상태에 빠지게 된다.
참고 : More Effective C# 빌 와그너 - 한빛 미디어
'C# > Advanced C#' 카테고리의 다른 글
[C#] BitConverter 클래스 (0) | 2022.04.14 |
---|---|
[C# Thread] BackgroundWorker 클래스 (0) | 2021.12.15 |
[More Effective C#] 3장 태스크 기반 비동기 프로그래밍 (0) | 2021.12.13 |
[More Effective C#] 2장 API 설계 (2) (0) | 2021.12.12 |
[More Effective C#] 2장 API 설계 (1) (0) | 2021.12.12 |