[TCP/IP 소켓 프로그래밍] (12) 다중 접속 서버 - 쓰레드
멀티 프로세스 기반 서버의 단점
새로운 클라이언트가 연결될 때마다 프로세스를 생성하는 것은 기존 프로세스의 메모리를 복사하는 것으로 비용이 크다.
두 프로세스간 통신을 위해 IPC라는 방법을 사용해야한다.
결정적으로 CPU 사용 시간을 나누어 사용하기 때문에 두 프로세스 간 문맥 교환이 일어난다. 프로세스 문맥 교환은 한 프로세스를 메인 메모리에서 내리고 다른 프로세스를 메인 메모리로 올리기 때문에 시간이 오래 걸리게 된다.
쓰레드란
쓰레드는 프로세스를 구성하는 실행 흐름으로 단위가 작다. 프로세스는 독립적인 메모리공간을 사용하기 때문에 데이터, 스택, 힙의 영역을 각 프로세스마다 독립적으로 갖는다.
쓰레드는 프로세스의 일부분으로 한 프로세스 내 여러 쓰레드는 프로세스 메모리 공간을 공유할 수 있다. 데이터, 힙과 같은 영역은 다른 쓰레드와 메모리를 공유하고 스레드 자체는 스택과 레지스터와 같이 독립적인 실행 흐름을 보장하기 위한 메모리를 할당 받는다.
즉, 쓰레드가 독립적으로 사용하는 메모리 공간은 더 작기 때문에 프로세스보다 가벼우며 이는 문맥 교환에도 빠르고 일반적인 함수 호출과 같은 단순한 문맥 교환이 적용된다.
---- 리눅스 환경에서
쓰레드 생성
int pthread_create(pthread_t* restrict thread, const pthread_attr_t *restrict attr,
void *(*start_routine)(void*), void *restrict arg);
- 성공 시 0, 실패 시 0외의 값
- thread: 생성할 쓰레드의 ID를 저장할 주소값 전달
- attr: 쓰레드에 부여할 속성 정보
- start_routine: 쓰레드의 메인 함수가 되는 함수의 주소
- arg: 쓰레드에 등록된 함수가 호출될 때 전달할 인자
쓰레드 생성시 등록된 함수가 생성한 쓰레드에서 실행된다.
쓰레드 Join
생성된 쓰레드가 생성한 상위 쓰레드 혹은 프로세스보다 늦게 종료될 때 상위 실행 객체의 소멸로 같이 종료될 수 있다.
상위 쓰레드 혹은 프로세스에서 join을 명시하여 생성된 쓰레드가 종료되고 반환값이 전달될 때까지 블로킹 상태에 있도록 한다. 다음 함수의 반환값이 반환되면 블로킹 상태에서 빠져나오게 된다.
int pthread_join(pthread_t thread, void **status);
- 성공 시 0, 실패 시 0 이외의 값 반환
- thread: 이 매개변수에 전달되는 ID의 쓰레드가 종료될 때까지 함수는 반환하지 않는다.
- status: 쓰레드의 main 함수가 반환하는 값이 저장될 포인터 변수의 주소 값을 전달한다.
예제 코드
#define _REENTRANT
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void* thread_main(void * arg);
int main(int argc, char*argv[])
{
pthread_t t_id;
int thread_param = 5;
void* thr_ret;
// create a thread "thread_main"
if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param) != 0)
{
puts("pthread_create() error");
return -1;
}
// block main until t_id thread returns
if(pthread_join(t_id, &thr_ret) != 0)
{
puts("pthread_join() error");
return -1;
}
printf("Thread return message %s \n", (char*)thr_ret);
free(thr_ret);
return 0;
}
void* thread_main(void *arg)
{
int i;
int cnt = *((int*)arg);
char* msg = (char*)malloc(sizeof(char) * 50);
strcpy(msg, "Hello, I am thread~\n");
for(i = 0; i < cnt; i++)
{
sleep(1);
puts("running thread");
}
return (void*)msg;
}
쓰레드 detach
쓰레드 종료시 상위 실행 객체로 반환값을 전달해야 소멸된다. 위의 join 함수도 반환값을 전달하고 있기 때문에 소멸할 수 있었다. detach는 join과 다르게 함수 호출시 해당 쓰레드가 종료될 때가지 블로킹 상태에 있지 않는다. detach는 상위 실행 객체와 독립시키고 쓰레드가 종료될 때 알아서 소멸되도록 유도한다.
int pthread_detach(pthread_t thread);
- thread: 종료와 동시에 소멸시킬 쓰레드의 ID 정보를 전달
- 공유 자원으로 임계 영역이 생기는 경우
둘 이상의 쓰레드가 동시에 실행하면 문제를 일으키는 문장이 하나 이상 존재하는 함수를 일 컷는다. 보통 멀티 스레드가 공유 하는 자원에 대한 접근 방법에 따라 결과가 달라지며 이러한 구간을 임계 영역이라한다.
- 쓰레드에 안전한 함수 (thread-safe function): 동시에 호출되어도 안전한 함수
- 쓰레드에 불안전한 함수 (thead-unsafe function): 동시에 호출되면 불안전한 함수
thread-unsafe function 예제
#define _REENTRANT
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#define NUM_THREAD 100
#define LARGE 100000000
void* thread_inc(void *arg);
void* thread_dec(void* arg);
long long num = 0;
int main(int argc, char*argv[])
{
pthread_t thread_id[NUM_THREAD];
int i;
printf("sizeof long long: %zd\n", sizeof(long long));
for(i=0;i<NUM_THREAD;++i)
{
// even id thread
if(i%2)
pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
// odd id thread
else
pthread_create(&(thread_id[i]), NULL, thread_dec, NULL);
}
for(i = 0; i < NUM_THREAD; ++i)
pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num);
return 0;
}
void* thread_inc(void* arg)
{
for(int i = 0; i < LARGE; ++i)
++num;
return NULL;
}
void* thread_dec(void* arg)
{
for(int i = 0; i < LARGE; ++i)
--num;
return NULL;
}
- 100개의 스레드를 생성하여 홀수 번호 쓰레드는 공유 변수 num에 대해 증가 연산을 실행하고 짝수 번호 쓰레드는 공유 변수 num에 대해 감소 연산을 실행한다.
- 위 실행 코드만 보았을 때는 공평한 실행으로 num의 결과가 0이 예상되지만 실제로는 다른 값이 나오게 된다.
동시 접근에 따른 실행 결과가 다르게 나오는 이유는 다음을 참고하면 좋겠다.
2021.01.15 - [Operating System] - [운영체제] 프로세스 동기화
위 문제를 해결하는 방법을 프로세스 동기화라고 한다.
동기화는 다음 두가지 상황에서 사용된다.
1. 동일한 메모리 영역으로 동시 접근이 발생하는 상황
2. 동일한 메모리 영역에 접근하는 쓰레드의 실행 순서를 지정해야하는 상황
1의 문제는 위에서 보았고 2같은 경우 특정 메모리에서 데이터를 기록하는 것과 읽는 것의 지켜야하는 순서가 반드시 있어야할 때 필요하다.
동기화 기법
뮤텍스(Mutex)
쓰레드의 동시 접근을 허용하지 않는다. 먼저 임계영역을 접근하는 쓰레드가 자물쇠를 잠그고 필요 과정을 수행한 뒤 임계영역을 빠져나갈 때 자물쇠를 푸는 방식이다.
생성과 삭제
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- mutex: 초기화/삭제하고자 하는 뮤텍스의 주소값은 전달
- attr: mutex 초기화시 전달하는 속성 정보
자물쇠를 잠그고 푸는 함수
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
일반적으로 다음과 같이 사용한다.
pthread_mutex_lock(&mutex);
// ... critical section
// ...
// ... critical section ends
pthread_mutex_unlock(&mutex);
thread-unsafe function의 예제에서 임계영역 부분을 다음과 같이 바꾼다.
void* thread_inc(void* arg)
{
// lock mutex
pthread_mutex_lock(&mutex);
for(int i = 0; i < LARGE; ++i)
++num;
// unlock mutex
pthread_mutex_unlock(&mutex);
return NULL;
}
mutex를 잠그고 푸는 것을 for문 안에 삽입한다면 필요없이 동기화를 번복적으로 하게 되므로 쓰레드 접근 시 한 번의 동기화가 일어나도록 해준다.
세마포어(Semaphore)
임계 영역을 접근하는 쓰레드 수를 제어한다. 최대 접근 가능한 쓰레드 수는 초기화 시 공유 자원의 수로 설정하여 세마포어를 생성한다.
int sem_init(sem_t* sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
- sem: 초기화할 세마포어의 주소값을 전달
- pshared: 0이외의 값 전달 시, 둘 이상의 프로세스에 의해 접근 가능한 세마포어 생성.
- value: 생성되는 세마포어의 초기 값 지정.
int sem_post(sem_t *sem);
int sem_wait(sem_t* sem);
sem_post 호출 시 sem은 1 증가하고 sem_wait 호출 시 sem 1 감소. 진입한 쓰레드가 sem을 확인하여 0보다 크다면 진입 가능하고 아니라면 블로킹 상태에 있게 된다.
뮤텍스와 마찬가지로 임계영역을 보호한다.
sem_wait(&sem);
// ... critical section
// ...
// ... critical section end
sem_post(&sem);
2. 동일한 메모리 영역에 접근하는 쓰레드의 실행 순서를 지정해야하는 상황의 예로 두 쓰레드가 공유 변수에 대해 수 하나를 입력받아 증가시키는 예제를 본다. 입력과 증가의 역할을 나누어 순서대로 실행하도록 한다.
#define _REENTRANT
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
void* read(void *arg);
void* accu(void *arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;
int main(int argc, char *argv[])
{
pthread_t id_t1, id_t2;
sem_init(&sem_one, 0, 0);
sem_init(&sem_two, 0, 1);
pthread_create(&id_t1, NULL, read, NULL);
pthread_create(&id_t2, NULL, accu, NULL);
pthread_join(id_t1, NULL);
pthread_join(id_t2, NULL);
sem_destroy(&sem_one);
sem_destroy(&sem_two);
return 0;
}
void* read(void* arg)
{
for(int i =0; i <5; ++i)
{
fputs("Input num: ", stdout);
sem_wait(&sem_two);
scanf("%d", &num);
sem_post(&sem_one);
}
return NULL;
}
void* accu(void* arg)
{
int sum = 0, i;
for(i = 0; i <5; ++i)
{
sem_wait(&sem_one);
sum += num;
sem_post(&sem_two);
}
printf("Result: %d\n", sum);
return NULL;
}
멀티쓰레드 기반의 다중접속 서버의 구현
서버 코드
클라이언트 연결
while(1)
{
clnt_adr_sz = sizeof(clnt_adr);
clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
pthread_mutex_lock(&mtx);
clnt_socks[clnt_cnt++] = clnt_sock;
pthread_mutex_unlock(&mtx);
pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
pthread_detach(t_id);
printf("Connected client IP: %s\n", inet_ntoa(clnt_adr.sin_addr));
}
클라이어트 연결을 수락하고 소켓을 할당한다. 소켓을 보관하는 배열은 다른 스레드에서도 사용하므로 뮤텍스로 보호하도록한다.
클라이언트 처리
void* handle_clnt(void* arg)
{
int clnt_sock = *((int*)arg);
int str_len =0 , i;
char msg[BUF_SIZE];
while((str_len=read(clnt_sock, msg, sizeof(msg)))!=0)
send_msg(msg, str_len);
pthread_mutex_lock(&mtx);
for(i = 0; i < clnt_cnt; ++i)
{
if(clnt_sock == clnt_socks[i])
{
while(i++<clnt_cnt-1)
clnt_socks[i] = clnt_socks[i+1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mtx);
close(clnt_sock);
return NULL;
}
handl_clnt 스레드는 에코 메세지를 전송하기 위해 메세지를 수신하고 클라이언트 소켓 종료를 처리하도록 한다. 연결 종료과정에서 소켓 배열을 접근하므로 뮤텍스로 보호하도록 한다.
메세지 송신
에코 메세지를 송신한다.
void send_msg(char* msg, int len)
{
int i;
pthread_mutex_lock(&mtx);
for(i=0;i<clnt_cnt;i++)
write(clnt_socks[i], msg, len);
pthread_mutex_unlock(&mtx);
}
클라이언트 코드
클라이어트측에서는 두 쓰레드를 생성하여 메세지 송신과 수신 역할을 분리한다. 멀티 프로세스 기반과 똑같은 모습이다.
#include "../network_header.h"
#define BUF_SIZE 100
#define NAME_SIZE 20
char name[20] = "[Default]";
char msg[BUF_SIZE] = {};
void* send_msg(void* arg);
void* recv_msg(void* arg);
int main(int argc, char* argv[])
{
int sock;
struct sockaddr_in serv_addr;
pthread_t snd_thread, rcv_thread;
void* thread_return;
sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(DEFAULT_ADDRESS);
serv_addr.sin_port = htons(PORT);
if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
error_handling("connect() error");
fputs("connected\n", stdout);
pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
pthread_join(snd_thread, &thread_return);
pthread_join(rcv_thread, &thread_return);
close(sock);
return 0;
}
// general sending message
void* send_msg(void* arg)
{
int sock = *((int*)arg);
char name_msg[NAME_SIZE+BUF_SIZE] = {};
while(1)
{
fgets(msg, BUF_SIZE, stdin);
if(!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
{
close(sock);
exit(0);
}
sprintf(name_msg, "%s %s", name, msg);
write(sock, name_msg, strlen(name_msg));
}
return NULL;
}
// receiving message
void* recv_msg(void* arg) // read thread main
{
int sock = *((int*)arg);
char name_msg[NAME_SIZE + BUF_SIZE] = {};
int str_len;
while(1)
{
str_len = read(sock, name_msg, NAME_SIZE+BUF_SIZE-1);
if(str_len == -1)
return (void*)-1;
name_msg[str_len] = 0;
fputs(name_msg, stdout);
}
return NULL;
}