C#/C# 기본

[C#] 스레드

로파이 2021. 10. 21. 23:51

C#에서 사용되는 스레드와 태스크를 알아본다.

 

스레드

스레드를 사용함으로써 스레드간 자원 공유가 가능하고 스레드를 이용하여 둘 이상의 로직을 동시에 실행하는 것처럼 만들 수 있으며 멀티 코어를 가진 컴퓨터에서 CPU 이용률을 높인다.

 

스레드의 생성과 시작

스레드는 새로운 실행 흐름을 만드는 것이므로 함수를 할당하여 스레드를 만들 수 있다.

// 쓰레드 인스턴스 생성
Thread t1 = new Thread(new ThreadStart(DoSomething));

 

public Thread(ParameterizedThreadStart start);
public Thread(ThreadStart start);
public Thread(ParameterizedThreadStart start, int maxStackSize);
public Thread(ThreadStart start, int maxStackSize);

Thread 라는 클래스를 만들기 위해 ThreadStart 혹은 ParmterizedThreadStart라는 매개변수를 받고 있다.

이들의 정의를 보면 반환이 없는 void 타입의 함수를 지칭하는 대리자(delegate)인 것을 알 수 있다.

public delegate void ThreadStart();
public delegate void ParameterizedThreadStart(object? obj)

 

- 시작 Start()

C++과 다르게 C#의 스레드는 생성시에 바탕 함수를 실행하지 않는다. 스레드는 생성 상태에 있으며 Start()를 명시적으로 호출해야 실행 흐름이 시작된다.

Thread t1 = new Thread(new ThreadStart(DoSomething));
t1.Start();

- 합류 Join()

Join() 함수는 호출한 스레드에서 해당 스레드가 실행을 완료하고 종료할 때 까지 기다리게 된다. 이러한 합류(Join)에 의해 두 실행 흐름이 하나로 합치게된다.

 

C++ Thread vs C# Thread

C++에서는 스레드에서 Joinable이라는 의미가 사용되었다. C++에서 자원은 프로그래머가 직접 관리해야하기 때문에 생성한 스레드 자원은 어디론가 합류(Join)해서 종료될 때까지 기다리거나 분리(Detach)되어서 알아서 자원이 해제하게 한다. C#에서는 모든 인스턴스는 가비지 컬렉터에 의해 관리되므로 C#에서 생성한 스레드가 Join()되지 않았다면 실행이 종료되고 스레드가 반납될 때까지 프로그램은 종료하지 않는다.

 

- 강제종료 Abort()

 static void DoSomething()
 {
   try
   {
     for (int i = 0; i < 100; ++i)
     {
       Console.WriteLine("DoSomething : {0}", i);
       Thread.Sleep(10);
     }
   }
   catch(ThreadAbortException)
   {
   //...
   }
   finally
   {
   //..
   }
}

Abort()를 이용하여 실행 중인 스레드를 강제 종료 할 수 있다. Abort된 스레드는 CLR에 의해 ThreadAbortException 예외를 던진다. 대체로 바람직하지 못한 상황이고 거의 쓸 일이 없다.

 

스레드의 상태

스레드의 상태도

  • UnStarted : 스레드 Thread 개체를 생성한 상태이다.
  • Running : 스레드가 CPU를 사용하여 실행을 하는 중에 있는 상태이다.
  • Suspend : Suspend()의 요청으로 잠시 뒤 일시 중단된 상태가 된다.
  • Aborted : Abort()의 요청으로 잠시 뒤 실행의 취소가 되어 다시 실행할 수 없는 상태가 된다.
  • Stopped : 등록한 대리자의 실행 완료로 Stopped 상태가 된다.
  • WaitSleepJoin : Monitor.Wait() / Thread.Sleep() / Thread.Join() 으로 인해 스레드가 다른 알림으로 인해 다시 실행되기 전까지 블록상태가 된다.
  • Background : 해당 스레드는 백그라운드로 동작하게 되며 해당 스레드의 종료 여부가 프로세스의 종료에 영향을 미치지 않는다. 하지만 반대로 메인 프로세스가 종료하면 백그라운드 스레드도 같이 종료한다.

스레드의 상태는 t.ThreadState로 알 수 있으며 Threadstate라는 열거형 비트 플래그로 나타낸다.

//
    // 요약:
    //     Specifies the execution states of a System.Threading.Thread.
    [Flags]
    public enum ThreadState
    {
        //
        // 요약:
        //     The thread has been started and not yet stopped.
        Running = 0,
        //
        // 요약:
        //     The thread is being requested to stop. This is for internal use only.
        StopRequested = 1,
        //
        // 요약:
        //     The thread is being requested to suspend.
        SuspendRequested = 2,
        //
        // 요약:
        //     The thread is being executed as a background thread, as opposed to a foreground
        //     thread. This state is controlled by setting the System.Threading.Thread.IsBackground
        //     property.
        Background = 4,
        //
        // 요약:
        //     The System.Threading.Thread.Start method has not been invoked on the thread.
        Unstarted = 8,
        //
        // 요약:
        //     The thread has stopped.
        Stopped = 16,
        //
        // 요약:
        //     The thread is blocked. This could be the result of calling System.Threading.Thread.Sleep(System.Int32)
        //     or System.Threading.Thread.Join, of requesting a lock - for example, by calling
        //     System.Threading.Monitor.Enter(System.Object) or System.Threading.Monitor.Wait(System.Object,System.Int32,System.Boolean)
        //     - or of waiting on a thread synchronization object such as System.Threading.ManualResetEvent.
        WaitSleepJoin = 32,
        //
        // 요약:
        //     The thread has been suspended.
        Suspended = 64,
        //
        // 요약:
        //     The System.Threading.Thread.Abort(System.Object) method has been invoked on the
        //     thread, but the thread has not yet received the pending System.Threading.ThreadAbortException
        //     that will attempt to terminate it.
        AbortRequested = 128,
        //
        // 요약:
        //     The thread state includes System.Threading.ThreadState.AbortRequested and the
        //     thread is now dead, but its state has not yet changed to System.Threading.ThreadState.Stopped.
        Aborted = 256
    }
if(t1.ThreadState & ThreadState.Background)
	Console.WriteLine("Is Background");
if(t1.ThreadState & ThreadState.WaitSleepJoin)
	Console.WriteLine("Is WaitSleepJoin");

비트 연산자를 이용하여 해당 스레드의 상태를 알 수 있으며 스레드는 여러 상태를 가질 수 있다. ex) 백그라운드 상태이면서 실행 중, 중지된 상태이면서 WaitSleepJoin 상태에 있는 등.

 

인터럽트 (Interrupt)

스레드를 강제 종료하는 Abort 대신 Interrupt를 사용하여 스레드가 WaitSleepJoin 상태가 될때 종료를 유도한다. 이때 스레드는 CLR에 의해 ThreadInterruptedException 예외를 던진다.

        static void DoSomething()
        {
            try
            {
                for (int i = 0; i < 100; ++i)
                {
                    Console.WriteLine("DoSomething : {0}", i);
                    Thread.Sleep(10);
                }
            }
            catch(ThreadInterruptedException)
            {
                //...
            }
            finally
            {
                //..
            }
        }

 

 

스레드 동기화

 

lock(object)

{

 // 임계 영역

}

 

lock 키워드를 이용하여 중괄호로 임계 영역을 감싸는 형태로 보호한다. lock의 매개변수로 전달하는 object는 참조 형식의 임의의 object를 전달하면 된다. 또다른 임계 영역에서 공유 변수를 사용할 경우 lock의 object에 동일한 인스턴스를 전달하여 동기화를 수행한다.

 

lock의 경우 다시 락을 거는 것을 허용한다. (recursive)

        static void Main(string[] args)
        {
            object obj = new object();

            lock(obj)
            {
                Console.WriteLine("Acquire Lock 1");
                lock(obj)
                {
                    Console.WriteLine("Acquire Lock 2");
                }
            }

            Console.WriteLine("Thread Ended");
        }

사용 예시)

class Counter
    {
        const int LOOP_COUNT = 1000;

        readonly object thisLock;

        private bool lockedCount = false;

        private int count;

        public int Count { get { return count; } }

        public Counter()
        {
            thisLock = new object();
            count = 0;
        }

        public void Increase()
        {
            int loopCount = LOOP_COUNT;
            while(loopCount-- > 0)
            {
                lock(thisLock)
                {
                    ++count;
                }
                Thread.Sleep(1);
            }
        }
        public void Decrease()
        {
            int loopCount = LOOP_COUNT;
            while (loopCount-- > 0)
            {
                lock (thisLock)
                {
                    --count;
                }
                Thread.Sleep(1);
            }
        }

 

동기화로 사용하지 말아야할 object

  • this : 클래스 인스턴스는 내외부적으로 많이 사용되므로 락으로 사용하지 않는다.
  • Type 형식 : typeof 연산자나 object 클래스로 부터 물려받은 GetType()을 통해 쉽게 얻을 수 있으므로 사용하지 않는 게 좋다.
  • string 형식: 비슷한 이유로 이름은 어디서에서나 사용가능하다.

 

Monitor 클래스로 동기화하기

        public void IncreaseMonitor()
        {
            int loopCount = LOOP_COUNT;
            while (loopCount-- > 0)
            {
                Monitor.Enter(thisLock);
                try
                {
                    ++count;
                }
                finally
                {
                    Monitor.Exit(thisLock);
                }
                Thread.Sleep(1);
            }
        }
        public void DecreaseMonitor()
        {
            int loopCount = LOOP_COUNT;
            while (loopCount-- > 0)
            {
                Monitor.Enter(thisLock);
                try
                {
                    --count;
                }
                finally
                {
                    Monitor.Exit(thisLock);
                }
                Thread.Sleep(1);
            }
        }

Monitor.Enter()와 Monitor.Exit()으로 임계영역을 보호할 수 있다.

임계영역에서 예외가 발생하면 잠금을 풀지 못하고 빠져나올 수 있으므로 finally를 통해 반드시 잠금을 해제하는 것을 보장하도록 한다.

 

lock과 Monitor

lock과 Monitor의 작동방식은 똑같으며 lock은 Monitor의 try Enter() ~ finally Exit()을 함축한 것이라고 할 수 있다. 따라서 lock 구문 자체도 임계영역의 예외에 안전하게 락을 해제하고 빠져나온다.

내부 구현은 임계영역을 들어가기전 경쟁하고 있는 스레드가 있는지 확인한다. 이미 락을 획득하고 실행중인 쓰레드가 있다면 잠시 스핀 카운트를 돌며 락을 얻는 것을 시도해본다. 어느 정도 긴 시간이 지났다고 판별되면 해당 스레드는 블락되어 CLREvent를 생성하고 커널 모드의 기능을 사용하여 대기 큐에 진입하게 된다.

 

Monitor의 Wait과 Pulse로 하는 저수준 동기화

주로 Monitory 대신 lock을 사용하지만 좀 더 섬세한 동기화가 필요하다면 Wait과 Pulse를 이용할 수 도 있다.

  • Wait() : Wait()을 호출하게되면 해당 스레드는 WaitSleepJoin 상태에 들어가게 되고 소유하고 있던 락을 해제하고 CLR의 Waiting Queue에 들어가게 된다. 
  • Pulse() : 반대로 락을 얻어 작업을 마친 스레드는 Pulse()를 이용하여 Waiting Queue에 있던 스레드를 하나 꺼내 Ready Queue에 입력한다. 해당 스레드는 lock을 얻으려 시도하고 성공하면 자신의 작업을 수행한다.

Wait()과 Pulse()

 

C#의 Wait()과 Pulse()와 C++의 condition_variable

Wait()과 Pulse() 구조는 C++의 condition_variable과 같다고 볼 수 있다. 

void DoWork()
{
    // condition_variable은 unique_lock을 매개변수로 받는다
    // 해당 뮤텍스로 부터 알림을 받는 조건 변수
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
 	// 임계영역 (작업 프로시저)

    lk.unlock();
    cv.notify_one(); // 다른 쓰레드의 진입 가능을 알린다.
}

생산자-소비자 구조에서 자주 활용되는 조건 변수를 통한 동기화는 특정 뮤텍스로 부터 알림(notify)를 기다리게 되는데 이는 C#의 Monitor가 Wait(object lock)를 통해 Pulse()의 호출을 기다리는 것과 똑같다.

 

Wait()를 사용하는데 주의점

Wait(lock)은 전달한 락 오브젝트가 반드시 잠긴 상태에서 호출되어야 한다. lock 동기화가 이루어지지 않은 상태에서 해당 함수를 호출하면 SynchronizationLockException 예외가 발생한다. 따라서, 주로 lock으로 감싼 임계영역 내용에서 Wait(lock)을 사용한다.

 

C#의 Pulse()와 Wait()을 이용한 1 Producer - 1 Consumer 구현

더보기
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ProducerAndConsumer
{
    class Kernel
    {
        private Queue<string> products = new();
        readonly object mutex = new();
        public int Count { get { return products.Count; } }
        public string GetProduct() 
        {
           if (products.Count == 0)
                    return "";

           return products.Dequeue();
        } 

        public void ProduceOnce(string product)
        {
           products.Enqueue(product);
        }

        public bool IsStart { get; set; } = false;
        public bool IsEnd { get; set; } = false;
    }

    class Producer
    {
        private string filePath;
        private readonly Kernel sync;
        private int IdleDuration = 10;
        public int Rate
        {
            get { return 1000 / IdleDuration; }
            set { IdleDuration = 1000 / value; }
        }
        public Producer(string _filePath, Kernel sync)
        {
            this.sync = sync;
            this.filePath = _filePath;
        }

        public void Produce()
        {
            Console.WriteLine("Producer Starts");

            using (StreamReader sr = new StreamReader(new FileStream(filePath, FileMode.Open)))
            {
                while (!sr.EndOfStream) 
                {
                    lock (sync)
                    {
                        sync.ProduceOnce(sr.ReadLine());
                        //Console.WriteLine("Producer Produced {0}", ++ProduceCount);

                        Monitor.Pulse(sync);
                    }

                    Thread.Sleep(IdleDuration);
                }

                while(true)
                {
                    lock (sync)
                    {
                        sync.IsEnd = true;
                        Monitor.Pulse(sync);
                        Console.WriteLine("Producer is Finished");
                        return;
                    }
                }
            }
        }
    }
    class Consumer
    {
        private readonly Kernel sync;
  
        public Consumer(Kernel sync)
        {
            this.sync = sync;
        }

        public void Consume()
        {
            while (true)
            {
                lock (sync)
                {
                    /*
                       Wait 함수 : lock을 획득한 채로 호출해야함
                                  lock을 일단 반환하고 재 획득할 때까지 기다림
                    */
                    while (sync.Count == 0 && !sync.IsEnd)
                    {
                        Monitor.Wait(sync);
                    }

                    if (sync.IsEnd)
                    {
                        Console.WriteLine("Consumer is Finished");
                        return;
                    }

                    while(sync.Count > 0)
                    {
                        string product = sync.GetProduct();

                        Console.WriteLine("Consumer gets : {0}", product);
                    }
                }
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            Kernel sync = new Kernel();

            Producer producer = new Producer("data.txt", sync);
            Consumer consumer = new Consumer(sync);
            producer.Rate = 100;

            Thread doProduce = new Thread(producer.Produce);
            Thread doConsume = new Thread(consumer.Consume);

            doConsume.Start();
            doProduce.Start();

            doConsume.Join();
            doProduce.Join();
        }
    }
}

 

'C# > C# 기본' 카테고리의 다른 글

[C#] default 연산자  (0) 2021.11.08
[C#] 파일 입출력  (0) 2021.10.21
[C#] dynamic 타입  (0) 2021.10.19
[C#] 리플렉션과 애트리뷰트  (0) 2021.10.19
[C#] LINQ  (0) 2021.10.18