Concurrency Runtime 에서는 기존의 멀티스레드 프로그래밍할 때, 필수적인 스레드를 생성하는 함수( CreateThread(), _beginthread() 등 )와 같은 기능을 제공합니다.
이 글에서는 어떻게 위와 같은 기능을 제공하고, 어떻게 사용하면 되는지 알아보도록 하겠습니다.
ScheduleTask
이 전에 설명했던 Parallel Patterns Library( 이하, PPL ) 이나 Asynchronous Agents Library( 이하, AAL ) 을 사용하게 되면 암묵적으로 스레드가 생성되고, 관리되기 때문에, 아주 간단한 작업을 처리할 때에는 비교적 높은 오버헤드( overhead )를 갖게 됩니다.
그래서 Concurrency Runtime 에서는 간단한 작업들을 처리하기에 비교적 적은 오버헤드를 갖는 기능을 제공합니다.
이 간단한 작업을 처리하는 방법은 기존의 스레드를 생성하는 방법과 유사합니다. 즉, 직접 스레드 코드를 제어하고 싶을 때, 사용할 수 있습니다.
아쉬운 점은 간단한 작업의 처리를 완료했을 때, 알려주지 않습니다. 스레드 핸들을 사용하지 않기 때문에 WaitForSingleObject() 와 같은 함수도 사용할 수 없습니다.
그렇기 때문에 스레드로 수행될 함수의 인자를 넘길 때, 스레드 종료를 알리는 메커니즘에 필요한 데이터를 포함해주어야 합니다.( 또는 전역적으로.. )
Concurrency Runtime 에서는 위와 같이 스레드를 생성하는 기능을 제공하지만, 간단한 작업이 아니라면 PPL 또는 AAL 을 사용하는 것을 권하고 있습니다.
이러한 기능은 ScheduleGroup::ScheduleTask(), CurrentScheduler::ScheduleTask(), Scheduler::ScheduleTask() 를 호출하여 사용합니다.
매개변수의 내용은 위의 ScheduleGroup::ScheduleTask() 와 같습니다.
예제
위에 언급된 내용과 함수들을 어떻게 사용하는 알아보도록 하겠습니다.
시나리오
새로운 스레드를 생성하고, 그 스레드에서 인자로 전달 받은 구조체의 내용을 출력하는 내용입니다.
코드
#include <iostream>
#include <concrt.h>
using namespace std;
using namespace Concurrency;
void __cdecl MyThreadFunction( void* pParam );
// 스레드에서 사용할 데이터
struct MyData
{
int val1;
int val2;
event signal;
};
int main()
{
MyData* pData = new MyData;
if( nullptr == pData )
return 1;
pData->val1 = 50;
pData->val2 = 100;
// _beginthreadex() 처럼 스레드를 생성하고 인자로 넘어간 함수를 생성된 스레드에서 수행한다.
CurrentScheduler::ScheduleTask( MyThreadFunction, pData );
// 수행 중인 작업이 끝날 때까지 대기.
pData->signal.wait();
// 작업이 끝난 후, 자원 해제.
delete pData;
return 0;
}
[ 코드2. CurrentScheduler::ScheduleTask() 를 사용하여 스레드를 생성하는 예제 ]
간단한 구조체를 생성하고, 스레드에서 수행될 함수에 전달합니다.
스레드에서는 구조체의 정보를 출력합니다.
스레드의 수행이 종료되기 전에 구조체를 제거하면 안되므로, event 객체를 이용해 스레드가 종료될 때까지 기다린 후, 제거합니다.
마치는 글
Concurrency Runtime 을 사용 중에 간단하게 스레드를 생성해야 한다면, 기존의 스레드 생성 함수를 사용하는 것보다 일관성 있게 ScheduleTask 를 사용하는 것이 좋습니다.
하지만 이 글에서 언급했듯이 간단하지 않은 작업이라면 PPL 이나 AAL 을 사용하는 것이 좋습니다.
다음 글에서는 Scheduler 에서 제공하는 Context 에 대해서 알아보도록 하겠습니다.
Scheduler 는 항상 하나 이상의 스케줄 그룹을 가지고 있습니다. 스케줄 그룹이 무엇인지 알아보고 동시성 프로그래밍에 어떤 영향을 주는지 알아보도록 하겠습니다.
SchedulerGroup
SchedulerGroup 이란 Scheduler 가 스케줄링 해야 할 작업들을 모아 놓은 스케줄 그룹을 대표하는 클래스입니다.
정책
이전 글에서 본 스케줄러 정책들 중 SchedulingProtocol 정책은 스케줄링 순서를 변경하여 작업들의 처리 순서를 변경할 수 있습니다. ScheduleringProtocol 정책이 EnhanseScheduleGroupLocality 로 설정된 경우에는 스케줄링 중인 스케줄 그룹을 변경하지 않고, 현재 스케줄링 중인 스케줄 그룹을 다른 스케줄 그룹보다 먼저 처리합니다. 반면에 SchedulingProtocol 정책이 EnhanseForwardProgress 로 설정된 경우에는 다른 스케줄링 그룹의 작업을 처리합니다. 스케줄 그룹 간에 작업을 공평하게 처리하게 됩니다.
생성
CurrentScheduler::CreateScheduleGroup() 또는 Scheduler::CreateScheduleGroup() 을 통해 ScheduleGroup 객체를 생성할 수 있습니다.
Scheduler 가 참조 개수를 사용하여 수명을 관리하는 것처럼 ScheduleGroup 객체 또한 참조 개수를 사용합니다. 생성되는 순간 참조 개수는 1이 됩니다. SchedulerGroup::Reference() 는 참조 개수는 1이 증가하고, SchedulerGroup::Release() 는 참조 개수가 1이 감소하게 됩니다.
동작 및 사용
Scheduler 객체에는 기본적으로 기본 ScheduleGroup 을 가지고 있습니다. 명시적으로 ScheduleGroup 을 생성하지 않는다면 스케줄링 해야 할 작업들은 기본 ScheduleGroup 에 추가됩니다.
ScheduleGroup 은 Concurrency Runtime 중 Asynchronous Agents Library( 이하, AAL ) 에서 사용할 수 있습니다. agent 클래스나 message block 들의 생성자로 ScheduleGroup 을 전달하여 사용할 수 있습니다.
최대 동시성 리소스( computing core ) 의 개수를 2개로 하고, 한 번은 SchedulingProtocol 정책을 EnhanceScheduleGroupLocality 로 하고, 한 번은 EnhanceForwardProgress 으로 하여 작업의 순서를 알아보았습니다.
2개의 스케줄 그룹을 생성하고, 하나의 그룹에 2개의 작업을 추가하여, 총 4개의 작업이 수행됩니다.
예제의 spin_loop() 는 어떤 작업을 처리하는 것을 의미하고 wait() 는 양보를 뜻합니다. wait() 가 호출되면 다른 작업이 스케줄링 되는데 이 때 결정되는 작업이 어떤 작업인지는 앞서 설정한 SchedulingProtocol 정책에 따릅니다.
EnhanceScheduleGroupLocality 로 설정된 경우에는 같은 스케줄 그룹 내의 작업이 처리되는 반면에, EnhanceForwardProgress 로 설정된 경우에는 다른 스케줄 그룹의 작업이 처리되게 됩니다.
[ 그림1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 실행 결과 ]
마치는 글
이렇게 스케줄링 정책과 스케줄 그룹을 통해서 스케줄링 순서를 결정하는데 관여할 수 있다는 것을 알아보았습니다.
다음 글에서는 스레드를 직접 생성하여 사용할 수 있는 ScheduleTask 라는 것에 대해서 알아보도록 하겠습니다.
위와 같이 생성한 SchedulerPolicy 객체는 3개의 정책을 설정하는데, 사용할 최소 동시성 자원( computing core ) 의 개수는 2개, 사용할 최대 동시성 자원의 개수는 4개, 현재 컨텍스트의 스레드 우선순위를 최고 순위로 설정하게 됩니다.
위 코드에서 보여드린 정책들 뿐만 아니라 여러 가지 정책들을 설정할 수 있습니다.
설정할 수 있는 정책들
PolicyElementKey 열거형으로 설정할 수 있는 정책들을 정의하고 있습니다. 그 내용은 다음과 같습니다.
정책 키
설명
기본 값
SchedulerKind
작업들을 수행할 때, 일반 스레드를 사용할지, UMS 스레드를 사용할지 설정
ThreadScheduler
MaxConcurrency
스케줄러에서 사용할 최대 동시성 자원 수
MaxExecutionResources
MinConcurrency
스케줄러에서 사용할 최소 동시성 자원 수
1
TargetOversubscriptionFactor
작업을 수행 시, 자원에 할당할 스레드의 수
1
LocalContextCacheSize
캐시할 수 있는 컨텍스트 수
8
ContextStackSize
각 컨텍스트에서 사용할 스택 크기( KB )
0( 기본 스택 크기 사용 )
ContextPriority
각 컨텍스트의 스레드 우선 순위
THREAD_PRIORITY_NORMAL
SchedulingProtocal
스케줄 그룹의 작업 예약 알고리즘
EnhanceScheduleGroupLocality
DynamicProgressFeedback
자원 통계 정책, 사용 금지.
ProgressFeedbackEnabled
[ 표1. PolicyElementKey 열거형에 정의된 설정할 수 있는 정책들 ]
기본 정책 설정
Asynchronous Agents Library( 이하, AAL ) 에는 우리가 생성한 정책을 지정한 Scheduler 객체를 사용할 수 있지만, Parallel Patterns Library ( 이하, PPL ) 에는 우리가 생성한 Scheduler 객체를 사용할 수 없고, 내부적으로 생성되는 기본 스케줄러를 사용하게 됩니다. 기본 스케줄러는 기본 정책을 사용하게 되는데, 이 기본 정책을 미리 설정해 둘 수 있습니다.
SetDefaultSchedulerPolicy() 의 인자로 SchedulerPolicy 를 생성하여 설정하면, 직접 Scheduler 를 생성하여 사용하지 않더라도, 내부에서 생성되는 기본 스케줄러에도 정책을 설정할 수 있습니다.
스케줄러 정책 획득
이미 생성된 Scheduler 객체로부터 설정된 정책을 가져올 수 있습니다.
_CRTIMP static SchedulerPolicy __cdecl GetPolicy(); // CurrentScheduler::GetPolicy() static function
virtual SchedulerPolicy GetPolicy(); // Scheduler::GetPolicy() member function
[ 코드4. GetPolicy() 들의 선언 ]
Scheduler 객체로부터 얻어온 정책은 설정할 때의 정책과 다를 수 있습니다. 정책을 설정하더라도, 해당 시스템에서 설정 가능한 정책만 적용되고, 그렇지 않은 경우에는 기본 정책 값이나 Resource Manager 에 의해 적당한 값으로 적용됩니다.
예를 들어 UMS 를 사용할 수 없는 시스템에서 UMS 를 사용하라고 설정하더라도, ThreadScheduler로 설정됩니다.
예제
Scheduler 를 사용하는 방법과 SchedulerPolicy 인해 어떤 영향을 받는지 알아보는 예제를 보도록 하겠습니다.
시나리오
문자열 순열을 구하는 프로그램입니다. 문자열 순열이란 문자열을 이루는 알파벳들로 만들 수 있는 모든 경우의 수를 말합니다.
문자열 순열을 구하는 작업은 오랜 시간이 걸리므로 agent 를 이용해 비 동기 처리를 합니다. 그리고 문자열 순열을 구하는 작업을 하는 agent 와 통신을 하며 진행 상황을 출력해주는 agent 가 비 동기로 처리됩니다.
이 때, Concurrency Runtime 은 협조적 스케줄링을 하기 때문에 바쁜 스케줄러에 더 많은 자원을 할당합니다. 반대로 바쁘지 않은 스케줄러에는 적은 자원이 할당됩니다. 이로 인해 진행 상황을 출력하는 agent 가 제대로 스케줄링되지 않는 현상이 발생하게 됩니다.
이의 해결책으로 진행 상황을 출력하는 agent 의 스레드 우선 순위를 높게 설정합니다.
코드
#include <Windows.h>
#include <ppl.h>
#include <agents.h>
#include <iostream>
#include <sstream>
using namespace std;
using namespace Concurrency;
// 문자열 순열을 구하는 agent
class permutor
: public agent
{
public:
explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress )
: source( source )
, progress( progress ) { }
explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress, Scheduler& scheduler )
: agent( scheduler )
, source( source )
, progress( progress ) { }
protected:
void run()
{
wstring s = receive( this->source );
this->permute( s );
this->done();
}
unsigned int factorial( unsigned int n )
{
if( 0 == n )
return 0;
if( 1 == n )
return 1;
return n * this->factorial( n - 1 );
}
wstring permutation( int n, const wstring& s )
{
wstring t( s );
size_t len = t.length();
for( unsigned int i = 2; i < len; ++i )
{
swap( t[ n % i ], t[i] );
n = n / i;
}
return t;
}
void permute( const wstring& s )
{
unsigned int permutation_count = this->factorial( s.length() );
long count = 0;
unsigned int previous_percent = 0u;
send( this->progress, previous_percent );
parallel_for( 0u, permutation_count, [&]( unsigned int i )
{
this->permutation( i, s );
unsigned int percent = 100 * InterlockedIncrement( &count ) / permutation_count;
if( percent > previous_percent )
{
send( this->progress, percent );
previous_percent = percent;
}
} );
send( this->progress, 100u );
}
private:
ISource< wstring >& source;
ITarget< unsigned int >& progress;
};
// 진행 상황을 출력하는 agent
class printer
: public agent
{
public:
explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress )
: source( source )
, progress( progress ) { }
explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress, Scheduler& scheduler )
: agent( scheduler )
, source( source )
, progress( progress ) { }
protected:
void run()
{
wstringstream ss;
ss << L"Computing all permutations of '" << receive( this->source ) << L"'..." << endl;
wcout << ss.str();
unsigned int previous_percent = 0u;
while( true )
{
unsigned int percent = receive( this->progress );
if( percent > previous_percent || percent == 0u )
{
wstringstream ss;
ss << L'\r' << percent << L"% complete...";
wcout << ss.str();
previous_percent = percent;
}
if( 100 == percent )
break;
}
wcout << endl;
this->done();
}
private:
ISource< wstring >& source;
ISource< unsigned int >& progress;
};
// agent 의 작업을 관리하는 함수
void permute_string( const wstring& source, Scheduler& permutor_scheduler, Scheduler& printer_scheduler )
{
single_assignment< wstring > source_string;
unbounded_buffer< unsigned int > progress;
permutor agent1( source_string, progress, permutor_scheduler );
printer agent2( source_string, progress, printer_scheduler );
agent1.start();
agent2.start();
send( source_string, source );
agent::wait( &agent1 );
agent::wait( &agent2 );
}
int main()
{
const wstring source( L"Grapefruit" );
// 기본 정책으로 작업을 수행
Scheduler* pDefault_scheduler = CurrentScheduler::Get();
wcout << L"With default scheduler: " << endl;
permute_string( source, *pDefault_scheduler, *pDefault_scheduler );
wcout << endl;
// 진행 상황을 출력하는 agent 에 필요한 스레드 우선 순위를 높게 하는 정책을 설정하여 적용
SchedulerPolicy printer_policy( 1, ContextPriority, THREAD_PRIORITY_HIGHEST );
Scheduler* pPrinter_scheduler = Scheduler::Create( printer_policy );
HANDLE hShutdownEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
pPrinter_scheduler->RegisterShutdownEvent( hShutdownEvent );
wcout << L"With higher context priority: " << endl;
permute_string( source, *pDefault_scheduler, *pPrinter_scheduler );
wcout << endl;
pPrinter_scheduler->Release();
WaitForSingleObject( hShutdownEvent, INFINITE );
CloseHandle( hShutdownEvent );
}
[ 코드5. Scheduler 객체에 스레드 우선 순위를 높인 SchedulerPolicy 객체를 적용한 예제 ]
기본 정책으로 수행했을 때에는 작업 진행 상황이 제대로 출력되지 않는 반면에, 스레드 우선 순위를 높게 설정한 경우에는 제대로 출력되는 것을 보실 수 있습니다.
[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]
마치는 글
이번 글에서는 Scheduler 의 기본적인 기능 중 하나인 SchedulerPolicy 를 설정하는 방법을 알아보았습니다. SchedulerPolicy 를 이용하여 기본 정책으로 해결되지 않는 다양한 문제점들을 해결 하실 수 있을 것입니다.
이번 글은 Parallel Patterns Library( 이하 PPL ) 과 Asynchronous Agents Library( 이하 AAL ) 내부에서 스케줄링을 하는 Scheduler 에 대해서 알아보도록 하겠습니다.
Scheduler class
Scheduler 클래스는 Concurrency Runtime 에서 실제로 스케줄링을 하는 객체입니다. 우리는 Scheduler 객체를 사용해서 스케줄링의 방법을 설정할 수 있습니다.
Scheduler 는 내부적으로 작업들을 그룹화한 ScheduleGroup 을 관리합니다. 또한 요청된 작업을 수행하는 Context 객체에 접근할 수 있도록 하여, 좀 더 구체적인 스케줄링을 할 수 있도록 도와줍니다.
Scheduler 생성
우리가 직접 Scheduler 를 생성하지 않아도, Concurrency Runtime 내부에서 기본 Scheduler 가 생성되어 스케줄링을 하게 됩니다. 이 경우에는 스케줄링 정책을 바꿀 수는 있으나, 세밀하게 제어할 수 없습니다.
기본 Scheduler 외에 직접 우리가 Scheduler 를 생성하는 방법은 2 가지가 있습니다.
CurrentScheduler::Create() 는 현재 컨텍스트와 연결하는 Scheduler 를 만듭니다.
Scheduler::Create() 는 현재 컨텍스트와 연결되지 않는 Scheduler 를 만듭니다.
Scheduler 는 내부적으로 참조 개수( reference counting ) 을 사용하여, 수명을 관리합니다. 그래서 참조 개수가 0이 되면 Scheduler 가 소멸됩니다.
Scheduler::Create()
현재 컨텍스트와 연결되지 않은 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.
Scheduler::Attach()
현재 컨텍스트와 Scheduler 를 연결합니다. 참조 개수가 증가합니다.
Scheduler::Reference()
참조 개수가 증가합니다.
Scheduler::Release()
참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.
CurrentScheduler::Create()
현재 컨텍스트와 연결된 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.
CurrentScheduler::Detach()
현재 컨텍스트를 분리합니다. 참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.
생성과 소멸, 연결과 분리
위와 같은 함수들을 제공하지만, 생성과 소멸, 연결과 분리가 짝을 이루어야 합니다.
CurrentScheduler::Create() 로 생성하였다면, CurrentScheduler::Detach() 로 소멸시키는 것이 좋습니다.
Scheduler::Create() 로 생성하고, Scheduler::Attach() 로 연결하였다면, Scheduler::Detach() 로 해제하고, Scheduler::Release() 로 소멸해야 합니다.
만약 Scheduler::Reference() 를 통해 참조 개수를 증가시켰다면, Scheduler::Release() 를 사용하여 참조 개수를 감소시켜주어야 합니다.
소멸 시점 알림
모든 작업이 끝나기 전에는 Scheduler 를 소멸시키지 않습니다. 언제 Scheduler 가 소멸되는지 알기 위해서는 RegisterShutdownEvent() 를 사용하여 Windows API 의 EVENT 객체를 지정해 주고, WaitForSingleObject() 를 사용하여 소멸될 때까지 대기할 수 있습니다.
그 외의 멤버 함수
위에서 설명한 멤버 함수 이외에 제공하는 멤버 함수들을 알아보도록 하겠습니다.
CurrentScheduler
Get() – 현재 컨텍스트에 연결된 Scheduler 의 포인터를 반환합니다. 참조 개수가 증가하지 않습니다.
CreateScheduleGroup() - ScheduleGroup 을 생성합니다.
ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
GetPolicy() – 연결된 정책의 복사본을 반환합니다.
Scheduler
CreateScheduleGroup() – ScheduleGroup 을 생성합니다.
ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
GetPolicy() – 연결된 정책의 복사본을 반환합니다.
SetDefaultSchedulePolicy() – 기본 Scheduler 에 적용될 정책을 설정합니다.
ResetDefaultSchedulePolicy() – 기본 Scheduler 의 정책을 SetDefaultSchedulerPolicy() 를 사용하기 전의 정책으로 설정합니다.
마치는 글
이번 글에서는 Concurrency Runtime 의 Scheduler 에 대해서 알아보았습니다. 위의 설명만으로는 어떻게 사용해야 하는지, 어떤 기능을 하는지 알기 어렵습니다.
다음 글에서 위에서 소개해드린 멤버 함수들의 사용 방법과 활용 예제들에 대해서 알아보도록 하겠습니다.
매개 변수인 _FWaitAll 은 지정된 event 들이 모두 설정될 때까지 기다릴 것인지 여부입니다. false 를 지정하면 하나의 event 라도 설정되면 기다리는 것을 멈추고 계속 진행됩니다.
마지막 매개 변수인 _Timeout 은 최대 시간입니다. 기본 매개 변수인 COOPERATIVE_TIMEOUT_INFINITE 는 무한대를 나타냅니다.
매개 변수 중 _FWaitAll 을 false 로 지정하고, 하나의 event 가 설정되었을 때, 설정된 event 의 _PPEvents 로 지정된 배열의 인덱스가 반환됩니다.
_FWaitAll 을 true 로 지정했을 경우에 모든 event 가 설정되었을 때에는 COOPERATIVE_WAIT_TIMEOUT 이 아닌 값이 반환됩니다.
Windows API 의 이벤트 객체를 사용할 때 함께 사용하는 WaitForMultipleObject() 와 유사합니다.
예제
Windows API 의 이벤트 객체와 어떻게 다른지 알아볼 수 있는 예제를 구현해보겠습니다.
시나리오
우선 최대 2개의 작업이 동시에 수행될 수 있도록 설정합니다.
그리고 하나의 event 를 생성한 후, 5개의 작업을 병렬로 처리합니다. 각 작업마다 생성한 event 가 설정될 때까지 기다리도록 합니다.
메인 스레드에서 1 초 후에 그 event 를 설정합니다.
같은 작업을 Windows API 의 이벤트 객체를 사용해서 구현합니다.
코드
// 코드의 출처는 msdn 입니다.
#include <windows.h>
#include <concrtrm.h>
#include <ppl.h>
#include <iostream>
#include <sstream>
using namespace Concurrency;
using namespace std;
// Demonstrates the usage of cooperative events.
void RunCooperativeEvents()
{
// An event object.
event e;
// Create a task group and execute five tasks that wait for
// the event to be set.
task_group tasks;
for (int i = 0; i < 5; ++i)
{
tasks.run([&] {
// Print a message before waiting on the event.
wstringstream ss;
ss << L"\t\tContext " << GetExecutionContextId()
<< L": waiting on an event." << endl;
wcout << ss.str();
// Wait for the event to be set.
e.wait();
// Print a message after the event is set.
ss = wstringstream();
ss << L"\t\tContext " << GetExecutionContextId()
<< L": received the event." << endl;
wcout << ss.str();
});
}
// Wait a sufficient amount of time for all tasks to enter
// the waiting state.
Sleep(1000L);
// Set the event.
wstringstream ss;
ss << L"\tSetting the event." << endl;
wcout << ss.str();
e.set();
// Wait for all tasks to complete.
tasks.wait();
}
// Demonstrates the usage of preemptive events.
void RunWindowsEvents()
{
// A Windows event object.
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, TEXT("Windows Event"));
// Create a task group and execute five tasks that wait for
// the event to be set.
task_group tasks;
for (int i = 0; i < 5; ++i)
{
tasks.run([&] {
// Print a message before waiting on the event.
wstringstream ss;
ss << L"\t\tContext " << GetExecutionContextId()
<< L": waiting on an event." << endl;
wcout << ss.str();
// Wait for the event to be set.
WaitForSingleObject(hEvent, INFINITE);
// Print a message after the event is set.
ss = wstringstream();
ss << L"\t\tContext " << GetExecutionContextId()
<< L": received the event." << endl;
wcout << ss.str();
});
}
// Wait a sufficient amount of time for all tasks to enter
// the waiting state.
Sleep(1000L);
// Set the event.
wstringstream ss;
ss << L"\tSetting the event." << endl;
wcout << ss.str();
SetEvent(hEvent);
// Wait for all tasks to complete.
tasks.wait();
// Close the event handle.
CloseHandle(hEvent);
}
int wmain()
{
// Create a scheduler policy that allows up to two
// simultaneous tasks.
SchedulerPolicy policy(1, MaxConcurrency, 2);
// Attach the policy to the current scheduler.
CurrentScheduler::Create(policy);
wcout << L"Cooperative event:" << endl;
RunCooperativeEvents();
wcout << L"Windows event:" << endl;
RunWindowsEvents();
}
[ 코드1. event 와 Windows API 이벤트 객체와의 차이 ]
event 의 경우 5 개의 작업 중 동시에 2개가 수행되도록 하여 2 개의 작업이 기다리고 있을 때, 기다리는 스레드 자원은 다른 작업을 하게 됩니다. 이것이 바로 협력적인 스케쥴링입니다.
반면에 Windows API 의 이벤트 객체를 사용할 경우, 2 개의 작업이 다시 시작될 때까지 스레드 자원은 낭비를 하게 됩니다.
[ 그림1. event와 Windows API 이벤트 객체와의 차이 예제 결과 ]
마치는 글
Concurrency Runtime 에서 제공하는 동기화 객체인 event 까지 알아보았습니다. 이렇게 해서 제공하는 모든 동기화 객체를 알아보았습니다.
동기화 객체도 알아보았으니 이제 사용자 정의 message block 을 구현할 준비가 다 된 것 같습니다.
다음 글에서는 제공하는 message block 이 외에 사용자가 구현사여 사용하는 message block 에 대해서 알아보겠습니다.
기본적으로 Asynchronous Agents Library( 이하 AAL ) 에서 제공하는 message block 들은 스레드에 안전합니다. 이 말은 내부적으로 동기화를 구현하고 있다는 말입니다.
하지만 Concurrency Runtime 을 이용한 동시 프로그래밍을 할 때 AAL 의 message block 을 사용하지 않고 일반적인 사용자 변수들을 사용할 수 있습니다. 하지만 이런 변수들은 동기화가 되지 않아 직접 동기화를 해 주어야 합니다.
이 때 Windows API 의 유저 모드 혹은 커널 모드의 동기화 객체를 사용할 수 있지만, Concurrency Runtime 에서 제공하는 동기화 객체를 쉽게 사용할 수 있습니다.
이번 글에서는 Concurrency Runtime 에서 제공하는 동기화 객체들에 대해서 알아보도록 하겠습니다.
critical_section
임계 영역을 의미하는 객체로, 어떤 영역을 잠그고 풀 수 있습니다. 잠긴 영역에 도달한 다른 스레드는 잠김이 풀릴 때까지 기다려야 합니다. 그러므로 해당 영역을 모두 사용한 후에는 다른 스레드도 사용할 수 있도록 풀어주어야 합니다.
이 기능은 Windows API 에서 제공하는 유저 모드 동기화 객체인 CRITICAL_SECTION 과 같은 기능을 합니다.
멤버 함수
생성자와 소멸자를 제외한 public 인 멤버 함수들에 대해서 알아보겠습니다.
void lock()
해당 영역을 잠급니다. 잠근다는 의미는 영역의 접근 권한을 획득한다는 의미도 있습니다. 잠긴 영역에 도달한 다른 스레드들은 잠김이 풀릴 때까지 기다려야 합니다.
bool try_lock()
해당 영역의 접근을 시도합니다. 이미 다른 스레드에 의해 잠겨있더라도 풀릴 때까지 기다리지 않고 반환합니다.
이미 다른 스레드에 의해 잠겨있다면 false 를, 아니면 true 를 반환합니다.
void unlock()
접근 영역의 접근 권한을 반환합니다. 즉, 잠금을 해제합니다. 이 함수가 호출된 후에 다른 스레드가 이 영역에 접근할 수 있습니다.
native_handle_type native_handle()
native_handle_type 은 critical_section 의 참조입니다. 결국, 이 함수는 자기 자신의 참조를 반환합니다.
범위 잠금
범위 잠금이란 코드 내의 블럭의 시작부터 블럭이 끝날 때까지 잠그는 것을 말합니다. 이것은 생성자에서 잠그고, 소멸자에서 잠금을 해제하는 원리( RAII – Resource Acquisition Is Initialization )로 구현되어 있으며, 굳이 unlock() 을 호출하지 않아도 된다는 편리함이 있습니다.
critical_section::scoped_lock
critical_section 객체를 범위 잠금 객체로 사용합니다. 생성자로 critical_section 의 참조를 넘겨주어야 합니다. 스택에 할당되고 가까운 코드 블럭이 끝날 때, 소멸자가 호출되어 잠금이 해제됩니다.
reader_writer_lock
reader_writer_lock 은 쓰기 접근 권한과 읽기 접근 권한을 구분하여 획득하고 잠글 수 있는 객체입니다.
한 스레드가 쓰기 접근 권한을 획득하여 잠긴다면, 다른 스레드의 모든 쓰기 / 읽기 접근 권한 획득시도가 실패하며 기다리게 됩니다.
만약 한 스레드가 읽기 접근 권한을 획득하여 잠긴다면, 다른 스레드가 읽기 접근 권한 획득 시도 시, 동시에 읽기 접근 권한을 획득 가능합니다.
위에 설명한 것처럼 2 가지 종류의 접근 권한이 있지만, 잠기는 것은 단 하나의 객체라는 것을 명심하셔야 합니다. 이 사실을 잊으면 헷갈릴 수 있습니다.
reader_writer_lock 의 이와 같은 기능은 Windows API 의 SRW( Slim Reader Writer ) 잠금과 유사합니다. 다른 점이 있다면 reader_writer_lock 은 기본 잠금이 쓰기 잠금으로 되어 있고, 잠겼을 때 다른 스레드들이 기다린 순서대로 접근 권한을 얻는다는 것입니다. 즉, Windows SRW 는 기본 잠금이라는 개념이 없고, 기다린 스레드들 중 어떤 순서로 접근 권한을 얻는지 알 수 없습니다.
멤버 함수
생성자와 소멸자를 제외한 public 인 멤버 함수들에 대해서 알아보겠습니다.
void lock()
기본 잠금으로 쓰기 접근 권한을 획득하고 잠급니다. 이미 다른 스레드에서 접근 권한을 획득하여 잠겨 있다면 해제될 때까지 기다립니다.
이 함수로 잠길 경우, lock() 과 읽기 접근 권한 획득 함수인 lock_read() 으로 접근 권한을 획득하려 하더라도 획득할 수 없고, 잠금이 해제될 때까지 기다리게 됩니다.
bool try_lock()
쓰기 접근 권한 획득을 시도합니다. 이미 다른 스레드가 접근 권한을 획득하여 잠겼을 경우에는 false 를 반환하고, 접근 권한을 획득하게 되면 true 를 반환합니다. 즉, 대기하지 않습니다.
void lock_read()
읽기 잠금으로 읽기 접근 권한 획득을 합니다. 다른 스레드에서 lock_read() 로 읽기 접근 권한 획득을 시도하면 이미 다른 스레드가 읽기 접근 권한을 획득하였다고 하더라도 동시에 읽기 접근 권한을 획득할 수 있습니다.
다른 스레드에서 lock() 으로 쓰기 접근 권한 획득을 시도한다면, 이 스레드는 모든 읽기 권한으로 인한 잠금이 해제될 때까지 기다리게 됩니다.
bool try_lock_read()
읽기 접근 권한 획득을 시도합니다. 이미 다른 접근 권한을 획득하여 잠겼을 경우에는 false 를 반환하고, 접근 궈한을 획득하게 되면 true 를 반환합니다. 즉, 대기하지 않습니다.
void unlock()
어떤 잠금이든 해제합니다.
범위 잠금
critical_section 과 마찬가지로 범위 잠금을 지원합니다.
reader_writer_lock::scoped_lock
critical_section 과 같습니다. 다른 점이 있다면 이 잠금 객체는 쓰기 잠금의 범위 잠금 객체입니다.
reader_writer_lock:scoped_lock_read
이 잠금 객체는 읽기 잠금의 범위 잠금 객체입니다.
마치는 글
Concurrency Runtime 에서 제공하는 동기화 객체은 critical_section 과 reader_writer_lock 에 대해서 알아보았습니다.
reader_writer_lock 이 동시 읽기 접근이 가능하므로 critical_section 보다는 상황에 따라 성능이 좋을 수 있습니다.
Asynchronous Agents Library 는 위 그림에서 보듯이 Concurrency Runtime 프레임워크 안에서 돌아가는 내부 컴포넌트 중 하나입니다.
라이브러리라는 명칭에서 독립적으로 수행될 것 같은 오해를 가질 수 있으나, 사실은 Concurrency Runtime 의 Task Scheduler 와 Resource Manager 를 기반으로 만들어졌습니다.
Concurrency Runtime 에서 Task Scheduler 와 Resource Manager 를 하위 레벨 컴포넌트라고 본다면, Asynchronouse Agent Library( 이하, AAL ) 와 Parallel Patterns Library( 이하, PPL ) 은 상위 레벨 컴포넌트라고 볼 수 있습니다. 다시 말해, AAL, PPL 모두 Task Scheduler 와 Resource Manager 를 사용한다는 말입니다.
Concurrency Runtime 이 아닌 다른 프레임워크에서도 하위 레벨보다 상위 레벨 컴포넌트가 사용 용이성과 안정성이 높은 것처럼 Concurrency Runtime 에서도 마찬가지입니다.
Concurrency Runtime 개발자들은 비 동기 작업들을 하기 위해서는 AAL, 단위 작업의 병렬 처리를 하기 위해서는 PPL 을 사용하기를 권하고, 좀 더 특별한 최적화나 하위 레벨 작업이 필요한 경우에만 직접 Task Scheduler 를 사용하기를 권하고 있습니다.
즉, AAL 은 Concurrency Runtime 의 비 동기 작업을 위한 인터페이스라고 볼 수 있습니다.( PPL 은 병렬 처리를 위한 인터페이스라고 볼 수 있습니다. )
비 동기 작업
비 동기 작업이란 작업이 끝날 때까지 기다리지 않는다는 말입니다. 즉, 어떤 함수가 있을 때, 그 함수는 바로 반환되어야 합니다. 바로 반환되면 호출한 스레드는 바로 다른 작업을 수행할 수 있습니다. 이 때, 해당 함수의 작업은 호출한 스레드가 아니라 다른 작업 스레드에서 수행되고, 작업이 완료되면 호출한 스레드에게 통지하거나, 콜백( call back ) 함수를 호출합니다.
이러한 처리 방식을 비 동기 처리 방식이라고 합니다. AAL 은 이런 비 동기 작업을 용이하게 해주는 라이브러리입니다.
기능
AAL 은 크게 두 가지 특징을 가지고 있습니다. 하나는 actor-based 프로그래밍 모델이고, 다른 하나는 message passing 프로그래밍 모델입니다.
Agent
Agent 는 AAL 의 기능 중 하나인 actor-based 프로그래밍 모델의 actor 를 나타내는 개념입니다. 인공 지능 등과 같은 다른 분야에서 사용하는 개념과 마찬가지로 어떤 임무를 수행하는, 즉 어떤 역할을 하는 객체를 일컫습니다.
예를 들어, 스마트 폰이 만들어 지려면 케이스( case ) 생산, 하드웨어 생산, 케이스와 하드웨어 조립, OS 및 소프트웨어 설치, 테스트와 같은 공정이 필요한데, 각 공정들을 수행하는 객체들을 agent 라고 볼 수 있습니다.
Message
Message 는 agent 들 간의 통신을 위한 메커니즘입니다. AAL 에서의 message 란 상황이나 상태를 알리기 위한 수단이 될 수도 있고, 실제 데이터를 전송하기 위한 수단이 될 수도 있습니다.
예를 들면, 스마트 폰 공정들 사이에 케이스가 생산되었다면 생산된 케이스를 조립하는 곳으로 전달해야 합니다. 이 때, 케이스를 전달하는 수단으로 message 가 사용될 수 있습니다. 또한 케이스와 하드웨어의 조립, OS 및 소프트웨어 설치가 완료되어야지 테스트를 진행할 수 있습니다. 이런 완료 상태와 같은 작업의 상태를 알리기 위해서 message 가 사용될 수 있습니다.
예제
위에 언급한 스마트 폰 생산 과정을 AAL 을 이용하여 구현해보았습니다. 예제가 비교적 긴 편이지만 최대한 이해하기 쉽게 직관적인 코드를 작성했습니다. AAL 을 중점적으로 소개하기 위해 디자인이나 테크닉은 무시하고 작성하였습니다.
전체적인 흐름을 이해하는 데에는 어려움이 없을 것 같으나 사용된 함수들이 어떤 역할을 하는지 궁금할 것입니다.
시나리오
스마트 폰 케이스 생산자와 하드웨어 생산자가 각각 케이스와 하드웨어를 부품으로 생산한다.
생산된 부품을 조립하는 객체에게 전달하면 조립하는 객체가 조립하게 된다.
조립된 스마트 폰은 소프트웨어 설치 객체에게 전달되고, 그 객체는 소프트웨어를 설치한다.
소프트웨어가 설치된 스마트 폰은 테스터에게 전달되고, 테스터는 테스트에 성공한 스마트 폰을 제품 컨테이너에 저장한다.
제품 컨테이너의 제품들을 출력하고 종료한다.
[ 그림2. 예제 시나리오 ]
코드
// 스마트폰 생산 예제 코드
#include <iostream>
#include <agents.h>
#include <ppl.h>
using namespace std;
using namespace Concurrency;
// 케이스 클래스
class Case
{
};
// 하드웨어 클래스
class Hardware
{
};
// 소프트웨어 클래스
class Software
{
};
// 스마트폰 클래스
class SmartPhone
{
public:
int uid;
Case* pCase;
Hardware* pHardware;
Software* pSoftware;
~SmartPhone()
{
if( 0 != this->pCase )
{
delete this->pCase;
this->pCase = 0;
}
if( 0 != this->pHardware )
{
delete this->pHardware;
this->pCase = 0;
}
if( 0 != this->pSoftware )
{
delete this->pSoftware;
this->pSoftware = 0;
}
}
SmartPhone( int _uid, Case* _pCase = 0, Hardware* _pHardware = 0, Software* _pSoftware = 0 )
: uid( _uid )
, pCase( _pCase )
, pHardware( _pHardware )
, pSoftware( 0 )
{
if( 0 != _pSoftware )
this->pSoftware = new Software( *_pSoftware );
}
void SetSoftware( Software* _pSoftware )
{
if( 0 != this->pSoftware )
delete this->pSoftware;
this->pSoftware = new Software( *_pSoftware );
}
};
// 메시지 버퍼 typedef
typedef unbounded_buffer< Case* > CaseBuffer;
typedef unbounded_buffer< Hardware* > HardwareBuffer;
typedef unbounded_buffer< SmartPhone* > SmartPhoneBuffer;
typedef vector< SmartPhone* > SmartPhoneVector;
// 케이스 생산자 agent
class CaseProducer
: public agent
{
private:
unsigned int caseCountToCreate;
CaseBuffer& caseBuffer;
protected:
void run()
{
// 필요한 개수 만큼 케이스를 생산하고 케이스 버퍼로 보낸다.
for( unsigned int i = 0; i < this->caseCountToCreate; ++i )
{
send( this->caseBuffer, new Case );
wcout << i << L": created a case." << endl;
}
// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
send( this->caseBuffer, static_cast< Case* >( 0 ) );
done();
}
public:
CaseProducer( unsigned int _caseCountToCreate, CaseBuffer& _caseBuffer )
: caseBuffer( _caseBuffer )
, caseCountToCreate( _caseCountToCreate ) { }
};
// 하드웨어 생산자 agent
class HardwareProducer
: public agent
{
private:
unsigned int hardwareCountToCreate;
HardwareBuffer& hardwareBuffer;
protected:
void run()
{
// 필요한 개수 만큼 하드웨어를 생산하고 하드웨어 버퍼로 보낸다.
for( unsigned int i = 0; i < this->hardwareCountToCreate; ++i )
{
send( this->hardwareBuffer, new Hardware );
wcout << i << L": created a hardware." << endl;
}
// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
send( this->hardwareBuffer, static_cast< Hardware* >( 0 ) );
done();
}
public:
HardwareProducer( unsigned int _hardwareCountToCreate, HardwareBuffer& _hardwareBuffer )
: hardwareCountToCreate( _hardwareCountToCreate )
, hardwareBuffer( _hardwareBuffer ) { }
};
// 부품( 케이스와 하드웨어)을 조립하는 agent
class Assembler
: public agent
{
private:
CaseBuffer& caseBuffer;
HardwareBuffer& hardwareBuffer;
SmartPhoneBuffer& incompletedSmartPhoneBuffer;
protected:
void run()
{
unsigned int loopCount = 0;
// 버퍼에 쌓인 부품( 케이스와 하드웨어 )을 꺼내 조립하여 스마트폰을 만든다.
while( true )
{
Case* pCase = receive( this->caseBuffer );
Hardware* pHardware = receive( this->hardwareBuffer );
// 더 이상 조립할 부품( 케이스 또는 하드웨어 )들이 없으면,
// 스마트폰 생산이 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
if( 0 == pCase || 0 == pHardware )
{
// 남은 케이스를 파괴한다.
if( 0 != pCase )
{
while( true )
{
Case* pGarbageCase = receive( this->caseBuffer );
if( 0 == pGarbageCase )
break;
delete pGarbageCase;
}
}
// 남은 하드웨어를 파괴한다.
if( 0 != pHardware )
{
while( true )
{
Hardware* pGarbageHardware = receive( this->hardwareBuffer );
if( 0 == pGarbageHardware )
break;
delete pGarbageHardware;
}
}
send( this->incompletedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
break;
}
send( this->incompletedSmartPhoneBuffer,
new SmartPhone( loopCount + 1, pCase, pHardware ) );
wcout << loopCount << L": created a smart phone." << endl;
++loopCount;
}
done();
}
public:
Assembler( CaseBuffer& _caseBuffer, HardwareBuffer& _hardwareBuffer,
SmartPhoneBuffer& _incompletedSmartPhoneBuffer )
: caseBuffer( _caseBuffer )
, hardwareBuffer( _hardwareBuffer )
, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer ) { }
};
// 소프트웨어 설치 agent
class SoftwareInstaller
: public agent
{
private:
Software& software;
SmartPhoneBuffer& incompletedSmartPhoneBuffer;
SmartPhoneBuffer& completedSmartPhoneBuffer;
SmartPhoneVector& faultySmartPhoneArray;
protected:
void run()
{
unsigned int loopCount = 0;
// 조립된 스마트폰에 소프트웨어를 설치한다.
while( true )
{
SmartPhone* pSmartPhone = receive( this->incompletedSmartPhoneBuffer );
// 더 이상 소프트웨어를 설치할 조립된 스마트폰이 없으면, 설치를 중단하고
// 소프트웨어 설치도 모두 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
if( 0 == pSmartPhone )
{
send( this->completedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
break;
}
wcout << loopCount;
// 제대로 조립되었는지 판단 후, 소프트웨어를 설치한다.
if( 0 != pSmartPhone->pCase && 0 != pSmartPhone->pHardware )
{
pSmartPhone->SetSoftware( &this->software );
send( this->completedSmartPhoneBuffer, pSmartPhone );
wcout << L": installed the software." << endl;
}
else
{
this->faultySmartPhoneArray.push_back( pSmartPhone );
wcout << L": failed to install the software." << endl;
}
++loopCount;
}
done();
}
public:
SoftwareInstaller( Software& _software, SmartPhoneBuffer& _incompletedSmartPhoneBuffer,
SmartPhoneBuffer& _completedSmartPhoneBuffer, SmartPhoneVector& _faultySmartPhoneArray )
: software( _software )
, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer )
, completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
, faultySmartPhoneArray( _faultySmartPhoneArray ) { }
};
// 테스트하는 agent
class Tester
: public agent
{
private:
SmartPhoneBuffer& completedSmartPhoneBuffer;
SmartPhoneVector& productArray;
SmartPhoneVector& faultySmartPhoneArray;
protected:
void run()
{
unsigned int loopCount = 0;
// 조립된 스마트폰에 소프트웨어 설치가 완료된 스마트폰을 테스트한다.
while( true )
{
SmartPhone* pSmartPhone = receive( this->completedSmartPhoneBuffer );
// 더 이상 테스트할 스마트폰이 없으면 중단한다.
if( 0 == pSmartPhone )
break;
wcout << loopCount;
if( this->Test( pSmartPhone ) )
{
this->productArray.push_back( pSmartPhone );
wcout << L": succeeded in testing." << endl;
}
else
{
this->faultySmartPhoneArray.push_back( pSmartPhone );
wcout << L": failed to test." << endl;
}
++loopCount;
}
done();
}
public:
Tester( SmartPhoneBuffer& _completedSmartPhoneBuffer,
SmartPhoneVector& _productArray, SmartPhoneVector& _faultySmartPhoneArray )
: completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
, productArray( _productArray )
, faultySmartPhoneArray( _faultySmartPhoneArray ) { }
bool Test( SmartPhone* _pSmartPhone )
{
return ( 0 != _pSmartPhone->pCase && 0 != _pSmartPhone->pHardware
&& 0 != _pSmartPhone->pSoftware );
}
};
int main()
{
// 케이스 생산자 agent 생성.
int caseCountToCreate = 10;
CaseBuffer caseBuffer;
CaseProducer caseProducer( caseCountToCreate, caseBuffer );
// 하드웨어 생산자 agent 생성.
int hardwareCountToCreate = 10;
HardwareBuffer hardwareBuffer;
HardwareProducer hardwareProducer( hardwareCountToCreate, hardwareBuffer );
// 부품( 케이스와 하드웨어 )을 조립하는 agent 생성.
SmartPhoneBuffer incompletedSmartPhoneBuffer;
Assembler assembler( caseBuffer, hardwareBuffer, incompletedSmartPhoneBuffer );
// 소프트웨어 설치 agent 생성.
SmartPhoneVector faultySmartPhoneArray;
SmartPhoneBuffer completedSmartPhoneBuffer;
SmartPhoneBuffer faultySmartPhoneBuffer;
Software android;
SoftwareInstaller softwareInstaller( android, incompletedSmartPhoneBuffer,
completedSmartPhoneBuffer, faultySmartPhoneArray );
// 테스트하는 agent 생성.
SmartPhoneVector productArray;
SmartPhoneBuffer productBuffer;
Tester tester( completedSmartPhoneBuffer, productArray, faultySmartPhoneArray );
// 모든 생성된 agent 작업 시작.
caseProducer.start();
hardwareProducer.start();
assembler.start();
softwareInstaller.start();
tester.start();
// 모든 agent 의 작업이 끝날 때까지 대기.
agent* watingAgents[] = { &caseProducer, &hardwareProducer, &assembler,
&softwareInstaller, &tester };
agent::wait_for_all( sizeof( watingAgents ) / sizeof( agent* ), watingAgents );
agent::wait( &caseProducer );
agent::wait( &hardwareProducer );
agent::wait( &assembler );
agent::wait( &softwareInstaller );
agent::wait( &tester );
// 완성된 제품 출력.
wcout << L"completed products: " << endl;
parallel_for_each( productArray.begin(), productArray.end(),
[] ( SmartPhone* _pSmartPhone )
{
wcout << L"product uid: " << _pSmartPhone->uid << endl;
});
// 자원 정리 - 완제품.
parallel_for_each( productArray.begin(), productArray.end(),
[] ( SmartPhone* _pSmartPhone )
{
if( 0 != _pSmartPhone )
delete _pSmartPhone;
});
// 자원 정리 - 불량품.
parallel_for_each( faultySmartPhoneArray.begin(), faultySmartPhoneArray.end(),
[] ( SmartPhone* _pSmartPhone )
{
if( 0 != _pSmartPhone )
delete _pSmartPhone;
});
}
[ 코드1. 예제 코드 ]
실행 결과
모든 agent 가 수행하는 작업이 비 동기적으로 호출되었고 결과를 기다립니다. 비 동기적으로 호출된 agent 의 작업들은 각각 독립적인 스레드에서 동시에 수행되며, 작업을 수행하는데 데이터가 필요하다면 메시지에 의해 필요한 데이터가 도착할 때까지 동기화되어 수행됩니다.
wcout 객체는 동기화되지 않기 때문에 작업의 넘버링이 엉켜있는데 실제로 세어보면 정확히 맞아 떨어집니다.
[ 그림3. 코드1 실행 결과 ]
메모리 누수
코드에 명시적인 메모리 누수가 없어도 Concurrency Runtime 에 의한 메모리 누수가 발견됩니다. 이것은 개발팀에서도 버그라고 인정하고 있습니다. 하지만 이번 2010 버전에는 수정되지 않을 것이고, 다음 버전인 서비스팩에 수정되어 포함될 것이라고 합니다.
암묵적으로 스케줄러를 정의했다면 프로그램이 종료될 때 깔끔하게 해제되어야 하는데 이 부분이 매끄럽지 못한 것이
아쉽습니다. 현재 관련 개발자는 이번 버전에서는 깔끔하게 처리하는 부분을 미처 넣지 못했지만 꼭 다음
버전(VC++ 11)에서는 꼭 해결하겠다고 이야기 합니다.
ps : 이것은 4월15일 세션에서 제가 언급하였습니다만 블로그에는 포스팅을 늦게 하게 되었습니다.
concurrent_queue는 사용 용도가 concurrent_vector 보다 더 많을 것 같아서 좀 더 자세하게 설명하겠습니다.
온라인 서버 애플리케이션의 경우 ‘Producer-Consumer 모델’이나 이와 비슷한 모델로 네트웍을 통해서 받은 패킷을 처리합니다. 즉
스레드 A는 네트웍을 통해서 패킷을 받으면 Queue에 넣습니다. 그리고 스레드 B는 Queue에서
패킷을 꺼내와서 처리합니다. 이 때 Queue는 스레드 A와 B가 같이 사용하므로 공유 객체입니다. 공유 객체이므로 패킷을 넣고 뺄 때 크리티컬섹션과 같은 동기 객체로 동기화를 해야 합니다. 이런 곳에 concurrent_queue를 사용하면 아주 좋습니다.
concurrent_queue를
사용하기 위한 준비 단계
너무 당연하듯이 헤더 파일과 네임스페이스를 선언해야 합니다.
헤더파일
#include <concurrent_queue.h>
네임스페이스
using namespace Concurrency;
을 선언합니다.
이제 사전 준비는 끝났습니다. concurrent_queue를 선언한
후 사용하면 됩니다.
concurrent_queue< int > queue1;
concurrent_queue에 데이터 추가
concurrent_queue에 새로운 데이터를 넣을 때는push라는 멤버를 사용합니다.
원형
void
push( const _Ty& _Src );
STL의 deque의 push_back과 같은 사용 방법과 기능도 같습니다. 다만 스레스
세이프 하다는 것이 다릅니다. concurrent_queue는 앞 회에서 이야기 했듯이 스레드 세이프한
컨테이너이므로 제약이 있습니다. 그래서 deque 와
다르게 제일 뒤로만 새로운 데이터를 넣을 수 있습니다.
concurrent_queue< int > queue1;
queue1.push( 11 );
concurrent_queue에서
데이터 가져오기
데이터를 가져올 때는try_pop멤버를 사용합니다. 앞의 push의 경우는 STL의 deque와 비슷했지만 try_pop은 꽤 다릅니다.
원형
bool
try_pop( _Ty& _Dest );
try_pop을 호출 했을 때
concurrent_queue에 데이터가 있다면 true를 반환하고 _Dest에 데이터가 담기며 concurrent_queue에 있는
해당 데이터는 삭제됩니다. 그러나 concurrent_queue에
데이터가 없다면 false를 즉시 반환하고 _Dest에는
호출했을 때의 그대로 됩니다.
concurrent_queue< int > queue1;
queue1.push( 12 );
queue1.push( 14 );
int Value = 0;
if( queue1.try_pop( Value ) )
{
//
queue1에서 데이터를 가져왔음
}
else
{
//
queue1은 비어 있었음.
}
concurrent_queue가
비어 있는지 검사
concurrent_queue가 비어 있는지 알고 싶을 때는empty()를 사용합니다. 이것은
STL의 deque와 같습니다.
원형
bool
empty() const;
비어 있을 때는 true를 반환하고 비어 있지 않을 때는 false를 반환합니다. 다만
empty를 호출할 때 비어 있는지 검사하므로 100% 정확하지 않습니다. 100% 정확하지 않다라는 것은 empty와 push, try_pop 이 셋은 스레드 세이프하여 동시에 사용될 수 있으므로
empty를 호출할 시점에는 데이터가 있어서 false를 반환했지만 바로 직후에 다른 스레드에서 try_pop으로 삭제를 해버렸다면 empty 호출 후 false를 반환했어 try_pop을 호출했는데 false가 반환 될 수 있습니다.
concurrent_queue에 있는 데이터의 개수를 알고 싶을 때
concurrent_queue에 있는 데이터의 개수를 알고 싶을 때는unsafe_size멤버를 사용합니다.
원형
size_type
unsafe_size() const;
이것은 이름에서도 알 수 있듯이 스레드 세이프 하지 않습니다. 그래서 unsafe_size를 호출할 때 push나 try_pop이 호출되면 unsafe_size를 통해서 얻은 결과는
올바르지 않습니다.
concurrent_queue에
있는 데이터 순차 접근
concurrent_queue에 있는 데이터를 모두 순차적으로 접근하고
싶을 때는unsafe_begin과 unsafe_end를
사용합니다.
원형
iterator
unsafe_begin();
const_iterator
unsafe_begin() const;
iterator
unsafe_end();
const_iterator
unsafe_end() const;
unsafe_begin을 사용하여 선두 위치를 얻고, unsafe_end를 사용하여 마지막 다음 위치(미 사용 영역)를 얻을 수 있습니다. 이것도 이름에 나와 있듯이 스레드 세이프 하지
않습니다.
모든 데이터 삭제
모든 데이터를 삭제할 때는clear를 사용합니다. 이것은 이름에 unsafe라는 것이 없지만 스레드 세이프 하지 않습니다.
원형
template< typename _Ty, class _Ax >
void
concurrent_queue<_Ty,_Ax>::clear();
제 글을 보는 분들은 C++을 알고 있다는 가정하고 있기 때문에 STL을 알고 있다고 생각하여 아주 간단하게 concurrent_queue를
설명 하였습니다.
concurrent_queue 정말 간단하지 않습니까? 전체적으로 STL의 deque와
비슷해서 어렵지 않을 것입니다. 다만 스레드 세이프 하지 않은 것들이 있기 때문에 이것들을 사용할 때는
조심해야 된다는 것만 유의하면 됩니다.
이것으로 Concurrency Runtime의 PPL에 대한 설명은 일단락 되었습니다.
이후에는 Concurrency Runtime의 다른 부분을 설명할지
아니면 Beta2에서 새로 추가된 C++0x의 기능이나 또는
이전에 설명한 것들을 더 깊게 설명할지 고민을 한 후 다시 찾아 뵙겠습니다.^^
concurrent_queue는 queue
자료구조와 같이 앞과 뒤에서 접근할 수 있습니다.
concurrent_queue는 스레드
세이프하게 enqueue와 dequeue(queue에 데이터를
넣고 빼는) 조작을 할 수 있습니다.
또 concurrent_queue는 반복자를 지원하지만 이것은 스레드
세이프 하지 않습니다.
concurrent_queue와 queue의 차이점
concurrent_queue와 queue는 서로 아주 비슷하지만 다음과 같은 다른 점이 있습니다.
( 정확하게는 concurrent_queue와 STL의 deque와의 차이점 이라고 할수 있습니다. )
- concurrent_queue는
enqueue와 dequeue 조작이 스레드 세이프 하다.
- concurrent_queue는 반복자를 지원하지만 이것은 스레드
세이프 하지 않다.
- concurrent_queue는
front와 pop 함수를 지원하지 않는다.
대신에 try_pop 함수를 대신해서 사용한다.
- concurrent_queue는
back 함수를 지원하지 않는다.
그러므로 마지막 요소를 참조하는 것은 불가능하다.
- concurrent_queue는
size 메소드 대신 unsafe_size 함수를 지원한다.
unsafe_size는 이름 그대로 스레드 세이프 하지 않다.
스레드 세이프한 concurrent_queue의 함수
concurrent_queue에
enqueue 또는 dequeue 하는 모든 조작에 대해서는 스레드 세이프합니다.
- empty
- push
- get_allocator
- try_pop
empty는 스레드 세이프하지만 empty
호출 후 반환되기 전에 다른 스레드에 의해서 queue가 작아지던가 커지는 경우 이 동작들이
끝난 후에 empty의 결과가 반환됩니다.
스레드 세이프 하지 않은 concurrent_queue의 함수
- clear
- unsafe_end
- unsafe_begin
- unsafe_size
반복자 지원
앞서 이야기 했듯이 concurrent_queue는 반복자를 지원하지만
이것은 스레드 세이프 하지 않습니다. 그래서 이것은 디버깅 할 때만 사용할 것을 추천합니다.
또 concurrent_queue의 반복자는 오직 앞으로만 순회할 수
있습니다.
concurrent_queue는 아래의 반복자를 지원합니다.
- operator++
- operator*
- operator->
concurrent_queue는 앞서 설명한 concurrent_vector와 같이 스레드 세이프한 컨테이너지만 STL의 vector와 deque에는 없는 제약 사항도 있습니다. 우리들이 Vector와 deque를
스레드 세이프하게 래핑하는 것보다는 Concurrency Runtime에서 제공하는 컨테이너가 성능적으로
더 좋지만 모든 동작이 스레드 세이프하지 않고 지원하지 않는 것도 있으니 조심해서 사용해야 합니다.
다음에는 일반적인 queue에는 없고 concurrent_queue에서만 새로 생긴 함수에 대해서 좀 더 자세하게 설명하겠습니다.
ps : 앞 주에 Intel의 TBB에 대한 책을 보았습니다. 전체적으로 Concurrency Runtime과 비슷한 부분이 많아서 책을 생각 외로 빨리 볼 수 있었습니다. 제 생각에 TBB나 Concurrency Runtime를 공부하면 다른 하나도 아주 빠르고 쉽게 습득할 수 있을 것 같습니다.
shink_to_fit는 메모리 사용량과 단편화를 최적화 시켜줍니다. 이것은 메모리 재할당을 하기 때문에 요소에 접근하는 모든 반복자가 무효화됩니다.
Intel TBB
CPU로 유명한 Intel에서는
멀티코어 CPU를 만들면서 병렬 프로그래밍을 좀 더 쉽고, 안전화고, 확장성 높은 프로그램을 만들 수 있도록 툴과 라이브러리를 만들었습니다.
라이브러리 중 TBB라는 병렬 프로그래밍 용 라이브러리가 있습니다. 아마 TBB를 아시는 분이라면
Concurrent Runtime의 PPL에 있는 것들이
TBB에 있는 것들과 비슷한 부분이 많다라는 것을 아실 것입니다.
VSTS 2010 Beta2가 나온지 얼마 되지 않아서 병렬 컨테이너에
대한 문서가 거의 없습니다. 그러나 TBB에 관한 문서는
검색을 해보면 적지 않게 찾을 수 있습니다. concurrent_vector에 대해서 좀 더 알고 싶은
분들은 Intel의 TBB에 대해서 알아보시면 좋을 것 같습니다.
Visual Stuido 2010 Beta2가 나오면서 제가 기대하고
있었던 병렬 컨테이너가 드디어 구현되었습니다.
Concurrency Runtime(이하 ConRT)에는 총 3개의 병렬 컨테이너를 제공합니다. Beta2에서는 모두 다 구현되지는 못하고concurrent_vector와 concurrent_queue 두 개가 구현되었습니다. 아직 구현되지
않은 것은 concurrent_hash_map입니다.
세 개의 컨테이너들은 C++ STL의 컨테이너 중에서 가장 자주 사용하는
것으로 vector, deque, hash_map 컨테이너의 병렬 버전이라는 것을 이름을 보면 쉽게
알 수 있을 것입니다.
STL에 있는 컨테이너와 비슷한 이름을 가진 것처럼 사용 방법도 기존의
컨테이너와 비슷합니다. 다만 병렬 컨테이너 답게 스레드 세이프하며, 기존의
컨테이너에서 제공하는 일부 기능을 지원하지 못하는 제한도 있습니다.
몇 회에 나누어서 concurrent_vector와 concurrent_queue에 대해서 설명해 나가겠습니다.
이번에는 첫 번째로 concurrent_vector에 대한 것입니다.
concurrent_vector란?
STL의 vector와
같이 임의 접근이 가능한 시퀀스 컨테이너입니다. concurrent_vector는 멀티 스레드에서 요소를
추가하던가 특정 요소에 접근해도 안전합니다. 반복자의 접근과 순회는 언제나 멀티 스레드에서 안전해야
하므로 요소를 추가할 때는 기존의 인덱스와 반복자를 무효화 시키면 안됩니다.
concurrent_vector와
vector의 차이점
기능
vctor
Concurrent_vector
추가
스레드에 안전하지 않음
스레드에 안전
요소에 접근
스레드에 안전하지 않음
스레드에 안전
반복자 접근 및 순회
스레드에 안전하지 않음
스레드에 안전
push_back
가능
가능
insert
가능
불가능
clear
모두 삭제
모두 삭제
erase
가능
불가능
pop_back
가능
불가능
배열식 접근 예. &v[0]+2
가능
불가능
grow_by,
grow_to_at_least (vector의 resize와 비슷)는 스레드에 안전
추가 또는
resize 때 기존 인덱스나 반복자의 위치가 바뀌지 않음
bool 형은 정의 되지 않았음
concurrent_vector에 대한 설명을 이번에는 소개 정도로 끝내고 다음부터는 본격적으로 Concurrent_vector을 어떻게 사용하면 되는지 상세하게 설명해 나가겠습니다.^^
취소는 그것을 호출했을 때 즉시 동작하지 않습니다. task group이
취소되면 런타임은 각 task에 interruption point를
발동하여 런타임을 throw 시켜서 활동중인 task가 취소될
때 내부 예외 형을 잡을 수 있습니다. Concurrency Runtime은 런타임이 언제 interruption point를 호출할지 정의되어 있지 않습니다. 런타임이
취소를 할 때 던지는 예외를 잡아서 처리할 필요가 있습니다.
그래서 만약 task의 처리 시간이 긴 경우는 정기적으로 취소 여부를
확인할 필요가 있습니다.
< 리스트 4. >
auto t5 = make_task([&] {
//
Perform work in a loop.
for
(int i = 0; i < 1000; ++i)
{
// To reduce overhead, occasionally check for
// cancelation.
if ((i%100) == 0)
{
if (tg2.is_canceling())
{
wcout << L"The task was canceled." << endl;
break;
}
}
// TODO: Perform work here.
}
});
<리스트 4>의
굵게 표시한 코드는 task5가 정기적으로 task group인 tg2가 취소 되었는지 조사하고 있는 것입니다.
<리스트 4>는
명시적으로 task5가 속한 task group인 tg2가 취소되었는지 조사하고 있는데 만약 해당 task가 속한 task group을 직접적으로 호출하지 않고 취소 여부를 조사하고 싶을 때는 is_current_task_group_canceling()을 사용합니다.
4. 병렬 알고리즘에서의 취소
task group에서 사용하는 병렬 알고리즘도 위에서 소개한 방법으로
취소할 수 있습니다.
< 리스트 5. Task
group에서 병렬 알고리즘 사용 >
structured_task_group tg;
task_group_status status =
tg.run_and_wait([&] {
parallel_for(0, 100, [&](int i) {
// Cancel the task when i is 50.
if (i == 50)
{
tg.cancel();
}
else
{
// TODO: Perform work here.
}
});
});
// Print the task group status.
wcout << L"The task group status
is: ";
switch (status)
{
case not_complete:
wcout << L"not complete." << endl;
break;
case completed:
wcout << L"completed." << endl;
break;
case canceled:
wcout << L"canceled." << endl;
break;
default:
wcout << L"unknown." << endl;
break;
}
<리스트 5>는 task group인 tg에
task를 넣을 때 병렬 알고리즘을 넣었습니다. 그리고 이번 beta2에 새로 생긴run_and_wait멤버를 사용하여 task의 실행이 끝날 때 까지 대기하도록 했습니다(예전에는 run 이후에 wait를 호출해야 했습니다).
물론 cancel이 아닌 예외를 발생 시켜서 취소 시킬 수도 있습니다.
< 리스트 6. 병렬
알고리즘에서 예외를 발생시켜서 취소 시키기 >
try
{
parallel_for(0, 100, [&](int i) {
// Throw an exception to cancel the task when i is 50.
if (i == 50)
{
throw i;
}
else
{
// TODO: Perform work here.
}
});
}
catch (int n)
{
wcout << L"Caught " << n << endl;
}
<리스트 6>은 하나의 task만 예외를 발생시키고 있기 때문에 task group의 모든 task를 취소
시키기 위해서는 모든
task에서 예외를 발생시켜야 합니다.
그래서 아래의 <리스트 7>과 같이 전역 변수 flag를 사용합니다.
< 리스트 7. 모든 병렬 알고리즘의 task 취소 시키기 >
bool canceled
= false;
parallel_for(0,
100, [&](int i) {
// For illustration, set the flag to cancel
the task when i is 50.
if (i == 50)
{
canceled = true;
}
// Perform work if the task is not canceled.
if (!canceled)
{
// TODO: Perform work here.
}
});
5. Parallel 작업을 취소를 사용하지 못하는 경우
취소 작업은 모든 상황에서 다 사용할 수 있는 것은 아닙니다. 특정
시나리오에서는 사용하지 못할 수가 있습니다. 예를 들면 어떤 task는
활동중인 다른 task에 의해 block이 풀렸지만 아직
시작하기 전에 task group이 최소되어 버리면 계속 시작하지 못하여 결과적으로 애플리케이션이 dead lock 상황에 빠지게 됩니다.
이것으로 task group에서의 병렬 작업의 취소에 대한 것은 마칩니다. 다음에는 Beta2에 드디어 구현된 Concurrency Container에 대해서 설명하겠숩니다.
참고 url
MSDN : http://msdn.microsoft.com/en-us/library/dd984117(VS.100).aspx
task group을 사용하여 복수의 작업을 병렬적으로 처리할 때
모든 작업이 끝나기 전에 작업을 취소 해야 되는 경우가 있을 것입니다. task group에서
이와 같은 취소 처리를 어떻게 하는지 알아보겠습니다.
Concurrency Rumtime에 대한 정보는 아직까지는 MSDN을 통해서 주로 얻을 수 있기 때문에 거의 대부분
MSDN에 있는 것을 제가 좀 더 보기 좋고 쉽게 전달할 수 있도록 각색을 하는 정도이니 이미
MSDN에서 보신 분들은 pass 하셔도 괜찮습니다.^^;
1. 병렬 작업의 tree
PPL은 task group를
사용하여 병렬 작업을 세분화하여 각 작업을 처리합니다. 또 task
group에 다른 task group를 넣으면 이것을 부모와 자식으로 tree 구조로 표현할 수 있습니다.
< 리스트 1. >
structured_task_group tg1;
auto t1 = make_task([&] {
structured_task_group tg2;
//
Create a child task.
auto t4 = make_task([&] {
// TODO: Perform work here.
});
//
Create a child task.
auto t5 = make_task([&] {
// TODO: Perform work here.
});
//
Run the child tasks and wait for them to finish.
tg2.run(t4);
tg2.run(t5);
tg2.wait();
});
// Create a child task.
auto t2 = make_task([&] {
//
TODO: Perform work here.
});
// Create a child task.
auto t3 = make_task([&] {
//
TODO: Perform work here.
});
// Run the child tasks and wait for them to
finish.
tg1.run(t1);
tg1.run(t2);
tg1.run(t3);
<리스트 1>에서는 structured_task_group tg2가 tg1에 들어가서 tg2는 tg1의 자식이 되었습니다.
이것을 tree 그림으로 표현하면 아래와 같습니다.
< 그림 1. >
2. 병렬
작업의 취소 방법
parallel task를 취소할 때는task group의 cancel 멤버를 사용하면 됩니다(task_group::cancel, structured_task_group::cancel). 또 다른 방법으로는
task에서 예외를 발생시키는 것입니다. 두 가지 방법 중 cancel 멤버를 사용하는 것이 훨씬 더 효율적입니다.
cancel을 사용하는 것을 top-down
방식으로 task group에 속한 모든 task를
취소시킵니다. 예외를 발생 시켜서 취소하는 방법은 bottom-up 방식으로 task group에 있는 각 task에서 예외를 발생시켜서 위로
전파시킵니다.
cancel 멤버는 자식 task에서만
영향을 끼칩니다. 예를 들면 <그림 1>의t4에서 tg2를 cancel하면 tg2에 속한 t4, t5 task만 취소됩니다. 그러나tg1을 cancel하면
모든 task가 취소됩니다.
structured_task_group은 thread 세이프 하지 않기 때문에 자식 task에서 cancel을 호출하면 어떤 행동을 할지 알 수 없습니다. 자식 task는 cancel로 부모
task를 취소하던가 is_canceling로 취소 여부를 조사할 수 있습니다.
< 리스트 2. cancel을
사용하여 취소 >
auto t4 = make_task([&] {
//
Perform work in a loop.
for (int i = 0; i < 1000; ++i)
{
// Call a function to perform work.
// If the work function fails, cancel all tasks in the tree.
bool succeeded = work(i);
if (!succeeded)
{
tg1.cancel();
break;
}
}
});
2.2. 예외를 발생시켜 병렬 작업 취소
앞서 cancel 멤버를 사용하는 것 이외에 예외를 발생시켜서 취소
시킬 수 있다고 했습니다. 그리고 이것은 cancel()을
사용하는 것보다 효율이 좋지 않다고 했습니다.
예외를 발생시켜서 취소하는 방법의 예는 아래의 <리스트 3>의 코드를 보시면 됩니다.
< 리스트 3. 예외를
발생시켜서 취소 >
structured_task_group tg2;
// Create a child task.
auto t4 = make_task([&] {
//
Perform work in a loop.
for (int i = 0; i < 1000; ++i)
{
// Call a function to perform work.
// If the work function fails, throw an exception to
// cancel the parent task.
bool succeeded = work(i);
if (!succeeded)
{
throw exception("The task failed");
}
}
});
// Create a child task.
auto t5 = make_task([&] {
//
TODO: Perform work here.
});
// Run the child tasks.
tg2.run(t4);
tg2.run(t5);
// Wait for the tasks to finish. The
runtime marshals any exception
// that occurs to the call to wait.
try
{
tg2.wait();
}
catch (const exception& e)
{
wcout << e.what() << endl;
}
task_group이나
structured_task_group의 wait는 예외가 발생했을 때는 반환 값을 표시하지
못합니다. 그래서 <리스트 3>의 아래 부분에서 try-catch에서 exception을 통해서 상태를 표시하고 있습니다.
PPL에서 제공하는 알고리즘을 사용하여 병렬로 작업을 실행할 때 각 작업에서 접근하는 공유 리소스는 스레드 세이프 하지 않기 때문에 lock을 걸어서 공유 리소스를 보호해야 합니다.
그러나
lock을 건다는 것은 번거롭기도 하며 성능에 좋지 않은 영향을 미칩니다.
가장 좋은 방법은 공유 리소스에 lock을 걸지 않아도 스레드 세이프한 것이 가장 좋습니다.
combinable은 바로 위에 언급한 문제를 해결해 주는 것입니다. 모든 상황에 다 사용할 수 있는 것은 아니지만
특정 상황에서는combinable을 사용하면 lock을
걸지 않아도 공유 리소스를 스레드 세이프하게 접근 할 수 있습니다.
combinable
combinable은 병렬로 처리하는 작업에서 각 작업마다 계산을 실행한 후 그 계산 결과를 통합할 때 사용하는 재 사용
가능한 스레드 로컬 스트레지를 제공합니다.
combinable은 복수의 스레드 또는 태스크 간에 공유 리소스가 있는 경우에 사용하면 편리합니다. combinable는
공유 리소스의 접근을 각 스레드 별로 제공하여 공유 상태를 제거할 수 있습니다.
스레드 로컬 스트리지
스레드 프로그래밍을 공부하시면 스레드 고유의
로컬 스트리지를 만들어서 해당 스레드는 자신의 로컬 스트리지에 읽기,쓰기를 하여 다른 스레드와의 경합을
피하는 방법을 배울 수 있습니다.
combinable은 이 스레드 로컬 스트리지와 비슷한 방법입니다.
combinable의 메소드 설명
combinable::local : 현재 스레드 컨텍스트와 관련된 로컬 변수의 참조를 얻는다.
combinable::clear : 오브젝트로부터 모든 스레드 로컬 변수를 삭제한다.
combinable::combine : 제공하고 있는 있는 조합 함수를 사용하여 모드 스레드 로컬 계산의
set으로부터 최종적인 값을 만든다.
combinable::combinable_each ; 제공하고 있는 조합 함수를 사용하여 모든 스레드 로컬 계산의 set으로부터
최종적인 값을 만든다.
combinable은 최종 결합 결과 타입의 파라미터를 가지고 있는 템플릿 클래스입니다. 기본 생성자를 호출하면
기본 생성자와 복사 생성자 _Ty 템플릿 파라미터 형이 꼭 있어야합니다. _Ty 템플릿 파라미터 형이 기본 생성자를 가지지 않는 경우 파라미터로 초기화 함수를 사용하는 생성자로 오버로드
되어 있는 것을 호출합니다.
combinable을 사용하여 모든 작업에서 처리한 계산 결과 값을 얻을 때는 combine()을 사용하여
합계를 구하던가, combine_each를 사용하여 각 작업에서 계산한 값을 하나씩 호출하여
계산합니다.
< 예제 1. Combinable을 사용하지 않고 lock을
사용할 때 >
……
int
TotalItemPrice1 = 0;
critical_section
rt;
parallel_for( 1,
10000, [&]( int n ) {
rt.lock();
TotalItemPrice += n;
rt.unlock();
}
);
………
<예제 1>은 critical_section을 사용하여 TotalItemPrice 변수를 보호하고 있습니다.
그럼 <예제 1>을 combunable을 사용하여 구현해 보겠습니다.
< 예제 2. Combinable 사용 >
#include <ppl.h>
#include <iostream>
using namespace
Concurrency;
using namespace
std;
int main()
{
combinable<int>
ItemPriceSum;
parallel_for( 1, 10000, [&]( int
n ) {
ItemPriceSum.local()
+= n;
}
);
int TotalItemPrice = ItemPriceSum.combine( [](int left, int
right) {
parallel_invoke는 일련의 태스크를 병렬로 실행할 때 사용합니다. 그리고 모든 태스크가 끝날 때까지 대기합니다. 이 알고리즘은 복수의 독립된 태스크를 실행할 때 유용합니다.
‘일련의 태스크를 병렬로 실행할 때 사용’이라는 것을 들었을 때 생각나는 것이 없는가요? 지금까지 제가 올렸던 글을 보셨던 분이라면 parallel task라는
말이 나와야 합니다. ^^
네
parallel_invoke는 parallel task와 비슷합니다.
parallel_invoke와 parallel task의 다른 점
복수 개의 태스크를 병렬로 실행한다는 것은
둘 다 같지만 아래와 같은 차이점이 있습니다.
parallel_invoke
parallel task
편이성
작업 함수만 정의하면 된다.
작업 함수를 만든 후 task handle로 관리해야 한다.
태스크 개수
10개 이하만 가능
제한 없음
모든 태스크의 종료 시 대기
무조건 모든 태스크가 끝날 때까지 대기
Wait를 사용하지 않으면 대기 하지 않는다.
parallel_invoke를 사용할 때
병렬로 실행할 태스크의 개수가 10개 이하이고, 모든 태스크가 종료 할 때까지 대기해도 상관 없는
경우에는 간단하게 사용할 수 있는 parallel_invoke를 사용하는 것이 좋습니다. 하지만 반대로 병렬로 실행할 태스크가 10개를 넘고 모든 태스크의
종료를 대기하지 않아야 할 때는 parallel task를 사용해야 합니다.
parallel_invoke 사용 방법
parallel_invoke는 병렬로 태스크를 두 개만 실행하는 것에서 10개까지 실행하는 9개의 버전이 있으며 파라미터를 두 개만 사용하는 것에서 10개의 파라미터를
사용하는 것으로 오버로드 되어 있습니다.
각 오버로드된 버전의 파라미터에는 태스크를
정의한 작업 함수를 넘겨야 합니다.
parallel_invoke 사용 예
아래 예제는 아주 간단한 것으로 게임 프로그램이
처음 실행할 때 각종 파일을 로딩하는 것을 아주 간략화 하여 parallel_invoke를 사용한 예입니다.
#include <iostream>
#include <ctime>
#include <windows.h>
#include <concrt.h>
#include <concrtrm.h>
using namespace std;
#include <ppl.h>
using namespace Concurrency;
// UI 이미지 로딩
void LoadUIImage()
{
Sleep(1000);
cout
<< "Load Complete UI Image" << endl;
}
// 텍스쳐 로딩
void LoadTexture()
{
Sleep(1000);
cout
<< "Load Complete Texture" << endl;
}
// 폰트 파일 로딩
void LoadFont()
{
Sleep(1000);
cout
<< "Load Complete Font" << endl;
}
int main()
{
parallel_invoke(
[] { LoadUIImage(); },
[] { LoadTexture(); },
[] { LoadFont(); }
);
getchar();
return
0;
}
< 실행 결과 >
위 예제를 parallel_invoke를 사용하지 않고 전통적인 방법으로 순서대로 실행했다면 각 작업 함수에서 1초씩 소비하므로 3초가 걸리지만
parallel_invoke를 사용하여 1초만에 끝납니다.
그리고 이전에 parallel_for에서도 이야기 했듯이 병렬로 실행할 때는
순서가 지켜지지 않는다는 것을 꼭 유의하시기 바랍니다. 위의 예의 경우도 LoadUIImage()을 첫 번째 파라미터로 넘겼지만 실행 결과를 보면 LoadFont()가
먼저 완료 되었습니다.
마지막으로 위의 예제코드에서 parallel_invoke와 관계 있는 부분만 추려볼 테니 확실하게 사용 방법을 외우시기를 바랍니다.^^
for_each에 대해서 알고 있는 분들은 앞서 소개한 parallel_for 보다 더 쉽다고 느낄 것입니다. 기존의 for_each가 사용하는 파라미터도 같습니다. 기존에 사용했던 for_each를
parallel_for_each로 바꿀려면 알고리즘 이름만 바꾸어도 됩니다.
초 간단
parallel_for_each 사용 방법
1. 필요한 헤더 파일 포함
#include
<ppl.h>
2.네임 스페이스 선언
using namespace
Concurrency;
3. parallel_for_each에서 사용할 함수 정의
4. parallel_for_each에서 사용할 STL 컨테이너
정의
5. parallel_for_each 사용
parallel_for_each를 사용하는 간단한 예제
#include<iostream>
#include
<algorithm>
#include
<vector>
usingnamespace std;
#include
<ppl.h>
usingnamespace Concurrency;
int main()
{
vector<
int > ItemCdList(10);
generate( ItemCdList.begin(),
ItemCdList.end(), []() -> int {
위 예제는 vecter 컨테이너에 램덤으로 10개의
숫자 값을 채운 후 출력하는 것입니다.
for_each와 paralle_for_each 사용 방법이 이름만 다를
뿐 똑 같습니다.
위 예제를 ‘초 간단 parallel_for_each 사용
방법’의 순서에 비추어 보면 아래 그림과 같습니다.
위 예제의 결과입니다.
공유 자원 동기화 문제
parallel_for 때도 잠시 언급했듯이
parallel_for_each는 순서대로 실행하지 않고 병렬로 실행하므로 for_each를
사용한 것과 비교해 보면 출력 순서가 서로 다릅니다.
그리고 특히 문제가 되는 것이 공유 자원을 사용할 때 따로 동기화 시키지 않으면 원하지 않는 결과가 나옵니다.
위와 같은 잘못된 결과는 나올 수도 있고 안 나올 수도 있습니다. 즉 타이밍에 의해서 발생하는
것이기 때문입니다.이것이 병렬 프로그래밍의 어려움 중의 하나인데 에러가 언제나 발생하면 빨리 발견하여 처리할
수 있는데 공유 자원을 동기화 하지 않았을 때 발생하는 문제는 바로 발생할 수도 있고 때로는 여러 번 실행했을 때 간혹 나올 때도 있어서 버그
찾기에 어려움이 있습니다.
공유 자원의 동기화가 깨어지는 것을 막기 위해서는 동기화 객체를 사용하면 됩니다. 위 예제에서
두 번째 사용한 parallel_for_each는 ‘critical_section’이라는
동기화 객체를 사용하여 공유 자원을 안전하게 보호하고 있어서 올바르게 값을 출력하고 있습니다.
‘critical_section’에 대해서는 다음 기회에 자세하게 설명하겠습니다.
parallel_for_each에 대해서는 이것으로 마무리하고 다음 번에는 parallel_invoke에 대해서 설명하겠습니다.
파라미터 값을 보면 for에서 사용하는 것과 비슷하다는 것을 알 수 있을겁니다. 차이점은 첫 번째 버전의 경우 증분 값으로 1이 자동으로 사용된다는 것과 마지막 파리미터로 병렬 처리에 사용할 함수를 사용한다는 것입니다.
for와 비슷하므로 for를 사용하는 대 부분을 prarallel_for로 변경할 수 있습니다. 다만 parallel_for 알고리즘에서는 반복 변수의 현재 값이 _Last 보다 작으면 중단합니다( 보통 for 문과 다르게 ‘<’ 조건만 사용합니다 ).
또 _Index_type 입력 파라미터는 정수형이어야만 합니다.
parallel_for 파라미터가 1보다 작은 경우 invalid_argument_Step 예외를 던집니다.
초 간단 parallel_for 사용 방법
1. 필요한 헤더 파일 포함 #include <ppl.h>
2. 네임 스페이스 선언
using namespace Concurrency;
3. parallel_for에서 호출할 작업 함수 정의
4. parallel_for에서 사용할 data set 정의
5. parallel_for 사용
그럼 아주 간단한 실제 사용 예제 코드를 볼까요?
#include<ppl.h>
#include <iostream>
usingnamespace Concurrency;
usingnamespace std;
int main()
{
int CallNum = 0;
int Numbers[50] = { 0, };
parallel_for( 0, 50-1, [&]( int n ) {
++CallNum;
Numbers[n] += CallNum;
}
);
for( int i = 0; i < 50; ++i )
{
cout << i << " : " << Numbers[i] << endl;
}
getchar();
return 0;
}
위 예제는 Numbers라는 int 형 배열의 각 요소에 CallNum 이라는 변수를 더하는 것입니다. 간단하고 확실하게 parallel_for 사용 방법을 보이기 위해 허접한 예제를 만들게 되었음을 양해 바랍니다.^^;;; ( 다음에 기회가 되면 좀 더 멋지고 실용적인 예제를 보여드리도록 하겠습니다 )
예제에서는 코드를 간략화 하기 위해서 parallel_for의 마지막 파리미터로 람다 식을 사용했습니다.
위 예제를 '초 간단 parallel_for 사용 방법'의 순서에 비추어보면 아래 그림과 같습니다.
예제를 실행하면 아래와 같은 결과가 나옵니다.
(길어서 일부만 캡쳐 했습니다)
실행 결과를 보면Numbers 배열의 각 요소의 값이 순서대로 증가되지 않았다라는 것을 알 수 있습니다. 만약 보통의 for 문이라면 Numbers[0]은 1, Numbers[1]은 2 라는 값으로 됩니다. 그러나parallel_for는 병렬적으로 실행되므로 순서가 지켜지지 않습니다. 또 CallNum 라는 변수는 parallel_for의 모든 스레드에서 접근하는 공유 변수이므로 동기화 되지 않았다라는 것도 유의해야 합니다.
Parallel_for를 사용할 때 순서대로 실행하지 않고, 공유 변수는 동기화 되지 않음을 잊지마시기를 바랍니다.
이것으로 (너무?)간단하게 parallel_for에 대해서 알아 보았습니다. 다음에는 parallel_for_each에 대해서 설명하겠습니다.
Parallel Patterns Library(이하 PPL)에는 데이터 컬렉션을 대상으로 쉽게 병렬
작업을 할 수 있게 해 주는 알고리즘이 있습니다. 이 알고리즘들은 생소한 것들이 아니고 C++의 표준 템플릿 라이브러리(STL)에서 제공하는 알고리즘과 비슷한
모양과 사용법을 가지고 있습니다.
( *데이터 컬렉션은 데이터 모음으로 배열이나 STL 컨테이너를 생각하면 됩니다 )
PPL에서 제공하는 병렬 알고리즘은 총 세 개가 있습니다.
1. parallel_for 알고리즘
2. parallel_for_each 알고리즘
3. parallel_invoke 알고리즘
세 개의 알고리즘 중 3번 parallel_invoke만 생소하지 1번과 2번은 앞의 ‘parallel_’이라는
글자만 빼면 ‘for’와 ‘for_each’로 C++로 프로그래밍할 때 자주 사용하는 것이므로 친숙하게 느껴질 겁니다.
실제
병렬 여부만 제외하면 우리가 알고 있는 것들과 비슷한 동작을 합니다. 그래서 쉽게 배울 수 있고 기존의
코드에 적용하기도 쉽습니다.
parallel_for 알고리즘은 일반적인 for문을 사용할 때와 비슷하게 데이터 컬렉션에서 시작할 위치와 마지막
위치, 증가분(생략 가능합니다)에 해야할 작업 함수를 파라미터로 넘기면 됩니다. 사용
방법에서 for문과 다른 점은 작업 함수를 넘긴다는 점입니다.
parallel_for_each 알고리즘은 기존 for_each와 거의 같습니다. 데이터
컬렉션에서 시작할 위치, 마지막 위치, 작업 함수를 파라미터로 넘기면
됩니다. parallel_for의 경우 기존의 for문을
사용할 때는 작업 함수를 파라미터로 넘기지 않기 때문에 기존 for 문에 비해서 구조가 달라지지만 parallel_for_each는 기존 for_each와 파라미터
사용 방법이 같기 때문에 알고리즘의 이름만 바꾸면 될 정도입니다.
parallel_invoke 알고리즘은 이전 회에 설명한 태스크 그룹과 비슷한면이 있습니다. 태스크 그룹과의 큰 차이점은 병렬로
할수 있는 작업은 10개로 제한 되지만 사용 방법은 태스크 그룹보다 더 간결한 점입니다다. 병렬 작업의 개수가 10개 이하인 경우 태스크 그룹보다 parallel_invoke를 사용하는 것이 훨씬 더 적합하다고 생각합니다.
이번은 간단하게 PPL에 있는 세 가지 병렬 알고리즘을 소개하는 것으로 마칩니다. 다음
회부터는 이번에 소개했던 세 개의 알고리즘을 하나씩 하나씩 자세하게 설명하겠습니다.