C , C++, C#

[C/C++] mutex 응용과 condition_value

vhxpffltm 2020. 3. 1. 16:06

이전시간에 Mutex에 대해 살펴보았다. 이번에는 이 mutex를 응용한 '생산자-소비자' 패턴을 알아보고 condion_value에 대해 간단하게 알아보자.

 

'생산자-소비자' 패턴은 멀티 스레딩하는데 많이 사용되며 '생산자'는 무언가 일을 처리하는 스레드를 받아오고 '소비자'는 받은 일을 처리하는 스레드를 의미한다.

 

아래의 전체코드를 한번 보자. 우리는 웹 페이지를 다운받는 시나리오를 가정한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include<chrono>  // std::chrono::miliseconds
#include<condition_variable>  // std::condition_variable
#include<iostream>
#include<mutex>
#include<queue>
#include<string>
#include<thread>
#include<vector>
 
using namespace std;
 
void producer_f(queue<string> *down_page, mutex* m, int index) {
    for (int i = 0; i < 5; i++) {
        //다운로드하는데 걸리는 시간을 임의로 지정.
        //각 스레드별로 걸리는 시간이 다름
        std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); //시간만큼 스레드를 sleep, chrono는 시간 관련 라이브러리
        //producer 스레드 5개를 넣어야하기 때문에 뮤텍스 m을 사용하여 스레드들 사이의 문제가 안생기도록 함
 
        string content = "웹 페이지 : " + to_string(i) + " from Thread(" + to_string(index) + ")\n";
        m->lock(); //data는 스레드 사이에서 공유되므로 임계영역에 넣음
        down_page->push(content);
        m->unlock();
        //consumer에게 content가 준비되었음을 알림
 
    }
}
 
//큐에 원소를 넣는 함수, 페이지를 계속 다운로드하는 역할
void consumer_f(queue<string>* down_page, mutex* m, int* num_process) {
    while (*num_process < 25) {//전체 처리하는 페이지가 5*5
        //while무한루프로 큐가 비어있지 않을때까지 돌아야함, 
        m->lock();
        if (down_page->empty()) {// 다운로드한 페이지가 없다면 다시 대기
            m->unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            //if문을 수십만번 호출하기 때문에 CPU자원의 낭비가 큼 -> 10 밀리초 뒤에 강제로 sleep 시켜 다시 확인
            continue;
        }
        //맨 앞의 페이지를 읽고 대기 목록에서 제거
        string content = down_page->front();
        down_page->pop();
 
        (*num_process)++;
        m->unlock(); // 다른 스레드에서도 다음 원소를 바로 처리할 수 있도록 함
        cout << content;
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}
 
 
int main()
{
    //생산자 소비자 패턴: 현재 다운로드한 페이지 리소스라고 이해
    queue<string> down_page;
    mutex m;
    //condition_variable cv;
    vector<thread> producer; //producer(생산자) 스레드
    
    for (int i = 0; i < 5; i++)
        producer.push_back(thread(producer_f, &down_page, &m, i + 1));
    
    int num_process = 0;
    vector<thread> consumer; //consumer(소비자) 스레드
    for (int i = 0; i < 3; i++) {
        consumer.push_back(thread(consumer_f, &down_page, &m, &num_process));
    }
    for (int i = 0; i < 5; i++) producer[i].join();
    for (int i = 0; i < 3; i++) consumer[i].join();
    return 0;
}
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
 
 

 

 

결과는 위와 같다.

 

Producer (생산자)

 

먼저 vector<thread> producer; //producer(생산자) 스레드 는 페이지를 다운로드하는 역할을 한다. 다운받은 페이지는 'down_page' 라는 큐에 저장한다.

Producer_f() 함수를 보자. std::this_thread::sleep_for 함수는 인자로 전달된 시간 만큼 쓰레드를 sleep 시키는데, 이 때 해당 인자로 chrono 의 시간 객체를 받게 됩니다. chrono 는 시간 관련 라이브러리로 기존의 C 의 time.h 보다 훨씬 편리한 기능을 제공한다.

 100 * index 밀리초 만큼 쓰레드를 재우기 위해서는 std::chrono::milliseconds(100 * index) 로 사용한다.

그리고 다운 받은 웹사이트 내용을 'content' 라는 변수로 가정한다.

이제 다운 받은 페이지를 큐에 집어 넣는다. 이 때 주의할 점으로, producer 쓰레드가 1 개가 아니라 5 개이다. 그래서 down_page 에 접근하는 쓰레드들 사이에 뮤텍스 m 으로 해당 코드를 감싸 문제가 발생하지 않도록 한다.

 

consumer (소비자)

 

consumer 쓰레드 입장에서는 언제 일이 올지 알 수 없다. 그래서 down_page 가 비어있지 않을 때 까지 계속 무한루프를 돌려 확인해야한다. 문제는 컴퓨터 CPU 의 속도에 비해 다운받은 페이지가 큐에 추가되는 속도는 매우 느리다.

producer 의 경우 약 100ms 마다 웹사이트 정보를 큐에 추가하게 되는데, 이 시간 동안 'down_page->empty()' 문장을 수십 만 번 호출할 수 있으며, 이것은 CPU자원의 낭비이다. 그래서

 

 m->lock();

        if (down_page->empty()) {// 다운로드한 페이지가 없다면 다시 대기

            m->unlock();

            std::this_thread::sleep_for(std::chrono::milliseconds(10));

            //if문을 수십만번 호출하기 때문에 CPU자원의 낭비가 큼 -> 10 밀리초 뒤에 강제로 sleep 시켜 다시 확인

            continue;

        }

위와 같은 코드를 임의로 작성한다. 그리고 위 코드아래는 'content'를 구현한 부분인데 맨 앞의 원소를 얻은 뒤, 맨 앞의 원소를 큐에서 제거한다.

이 때 m->unlock 을 함으로써 다른 쓰레드에서도 다음 원소를 바로 처리할 수 있도록 해준다. content 를 처리하는 시간은 대충 80 밀리초가 소모된다고 가정하였다.

 

위 내용을 간단하게 요약하면 아래와 같다.

 

 

우리가 10 밀리초로 어느정도 여유를 두고 작업했지만 이것도 그렇게 효율적인 방법은 아니다. 우리는 일이 추가됐을때, 아래그림처럼, Consumer를 작동시켜 일을 처리하는 방법이 제일 효율적일 것이다. 어떻게 할 수 있을까?

 

 

 

Condition_value

 

조건 변수(condition_value)를 사용하여 위 경우에서 'down_page->empty()' 가 참이 아닐때까지 재울 수 있다. 수정된 코드는 아래와 같다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include<chrono>  // std::chrono::miliseconds
#include<condition_variable>  // std::condition_variable
#include<iostream>
#include<mutex>
#include<queue>
#include<string>
#include<thread>
#include<vector>
 
using namespace std;
 
void producer_f(queue<string> *down_page, mutex* m, int index, condition_variable* c) {
    for (int i = 0; i < 5; i++) {
        //다운로드하는데 걸리는 시간을 임의로 지정.
        //각 스레드별로 걸리는 시간이 다름
        std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); //시간만큼 스레드를 sleep, chrono는 시간 관련 라이브러리
        //producer 스레드 5개를 넣어야하기 때문에 뮤텍스 m을 사용하여 스레드들 사이의 문제가 안생기도록 함
 
        string content = "웹 페이지 : " + to_string(i) + " from Thread(" + to_string(index) + ")\n";
        m->lock(); //data는 스레드 사이에서 공유되므로 임계영역에 넣음
        down_page->push(content);
        m->unlock();
        c->notify_one(); //consumer에게 content가 준비되었음을 알림
    }
}
 
//큐에 원소를 넣는 함수, 페이지를 계속 다운로드하는 역할
void consumer_f(queue<string>* down_page, mutex* m, int* num_process,condition_variable* c) {
    while (*num_process < 25) {//전체 처리하는 페이지가 5*5 이시점에서 이미 lk 는 lock 이 되어 있습니다.
        unique_lock<mutex> mk(*m);
        c->wait(mk, [&] {return !down_page->empty() || *num_process == 25; });
        if (*num_process == 25) {
            mk.unlock();
            return;
        }
 
        //맨 앞의 페이지를 읽고 대기 목록에서 제거
        string content = down_page->front();
        down_page->pop();
 
        (*num_process)++;
        mk.unlock(); // 다른 스레드에서도 다음 원소를 바로 처리할 수 있도록 함
        cout << content;
        std::this_thread::sleep_for(std::chrono::milliseconds(80));
    }
}
 
 
int main()
{
    //생산자 소비자 패턴: 현재 다운로드한 페이지 리소스라고 이해
    queue<string> down_page;
    mutex m;
    condition_variable c;
    vector<thread> producer; //producer(생산자) 스레드
    
    for (int i = 0; i < 5; i++)
        producer.push_back(thread(producer_f, &down_page, &m, i + 1,&c));
    
    int num_process = 0;
    vector<thread> consumer; //consumer(소비자) 스레드
    for (int i = 0; i < 3; i++) {
        consumer.push_back(thread(consumer_f, &down_page, &m, &num_process,&c));
    }
    for (int i = 0; i < 5; i++) producer[i].join();
    c.notify_all(); // 나머지 자고있는 스레드들을 모두 깨움
    for (int i = 0; i < 3; i++) consumer[i].join();
    return 0;
}
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
 

 

위 코드는 main() 부분에 condition_variable c; 이 추가되었다. 이것이 이제 producer_f() 와 consumer_f() 함수에서 쓰이는걸 확인하는데 consumer_f() 부터 보자.

 

consumer_f()

 

unique_lock<mutex> mk(*m);

 

이 코드는 lock_guard<> 와 거의 동일하지만 lock_guard<> 의 경우 생성자 말고는 따로 lock() 을 할 수 없는데, unique_lock  unlock 후에 다시 lock 할 수 있습니다. 추가로 cv->wait()가 unique_lock<> 을 인자로 받는다.

이시점에서 이미 mk 는 lock 이 되어 있다.

 

 

 c->wait(mk, [&] {return !down_page->empty() || *num_process == 25; });

      

condition_variable  wait() 함수에 어떤 조건이 참이 될 때 까지 기다릴지 해당 조건을 인자로 전달한다. 위의 경우 조건으로 '람다함수' 를 전달하였는데, down_page가 비어있지 않거나, 전체 처리된 페이지 개수가 25개 일 때 wait()함수를 멈추도록 한다.

조건 변수(Condition_value)는 해당 조건이 거짓이라면, mk  unlock 한 뒤에, 영원히 sleep 하게 된다. 이 때 이 쓰레드는 다른 누군가 깨워주기 전까지 계속 sleep 된 상태로 기다리게 된다. 한 가지 중요한 점이라면 mk  unlock() 한다는 것이다.

반대로 해당 조건이 참이라면, cv.wait() 는 그대로 리턴해서 consumer  content 를 처리하는 부분이 그대로 실행된다.

 

 

 

if (*num_process == 25) {

            mk.unlock();

            return;

        }

 

마지막으로 위 코드는 모든 페이지 처리를 완료해서 인지, 아니면 정말 down_page에 페이지가 추가됬는지 알 수 없기 때문에 만약, 모든 페이지 처리가 끝나서 탈출한 것였다면, 그냥 쓰레드를 종료해야 한다.

 

Producer_f()

 

생산자 함수에서는  c->notify_one(); 코드만 추가되었다. 

만약에 페이지를 하나 다운 받았다면, 잠자고 있는 쓰레드들 중 하나를 깨워서 일을 시켜야한다. (만약에 모든 쓰레드들이 일을 하고 있는 상태라면 아무 일도 일어나지 않는다.) notify_one() 함수는 말 그대로,조건이 거짓인 바람에 자고 있는 쓰레드 중 하나를 깨워서 조건을 다시 검사하게 한다. 만일 조건이 참이 된다면 그 쓰레드가 다시 일을 시작하게된다.

 

main() 함수로 돌아가, producer 들이 모두 일을 끝낸 시점을 본다면, 자고 있는 일부 consumer 쓰레드들이 있을 것이고 만약에 cv.notify_all() 을 하지 않는다면, 자고 있는 consumer 쓰레드들의 경우 join 되지 않는 문제가 발생한다.

마지막으로 cv.notify_all() 을 통해서 모든 쓰레드를 깨워서 조건을 검사하도록 합니다. 해당 시점에선 이미 num_processed 가 25 가 되어 있을 것이므로, 모든 쓰레드들이 잠에서 깨어나 종료하게 된다.

 

Refernce

https://modoocode.com/252c

++17 STL 프로그래밍

'C , C++, C#' 카테고리의 다른 글

[C#] WinForm을 이용한 클래스  (0) 2020.03.25
[C#] WinForm을 이용한 계산기  (0) 2020.03.23
[C/C++] Mutex  (0) 2020.02.22
[C/C++] Thread  (0) 2020.01.19
[C/C++] Callable, std::function  (0) 2020.01.14