Concurrency Runtime – Task Scheduler 4. ( ScheduleTask )

VC++ 10 Concurrency Runtime 2010. 10. 4. 08:30 Posted by 알 수 없는 사용자

Concurrency Runtime
– Task Scheduler 4. ( ScheduleTask )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 Concurrency Runtime 에서는 기존의 멀티스레드 프로그래밍할 때, 필수적인 스레드를 생성하는 함수( CreateThread(), _beginthread() 등 )와 같은 기능을 제공합니다.

이 글에서는 어떻게 위와 같은 기능을 제공하고, 어떻게 사용하면 되는지 알아보도록 하겠습니다.

 

ScheduleTask

 이 전에 설명했던 Parallel Patterns Library( 이하, PPL ) 이나 Asynchronous Agents Library( 이하, AAL ) 을 사용하게 되면 암묵적으로 스레드가 생성되고, 관리되기 때문에, 아주 간단한 작업을 처리할 때에는 비교적 높은 오버헤드( overhead )를 갖게 됩니다.

 그래서 Concurrency Runtime 에서는 간단한 작업들을 처리하기에 비교적 적은 오버헤드를 갖는 기능을 제공합니다.

 이 간단한 작업을 처리하는 방법은 기존의 스레드를 생성하는 방법과 유사합니다. 즉, 직접 스레드 코드를 제어하고 싶을 때, 사용할 수 있습니다.

 아쉬운 점은 간단한 작업의 처리를 완료했을 때, 알려주지 않습니다. 스레드 핸들을 사용하지 않기 때문에 WaitForSingleObject() 와 같은 함수도 사용할 수 없습니다.

 그렇기 때문에 스레드로 수행될 함수의 인자를 넘길 때, 스레드 종료를 알리는 메커니즘에 필요한 데이터를 포함해주어야 합니다.( 또는 전역적으로.. )

 Concurrency Runtime 에서는 위와 같이 스레드를 생성하는 기능을 제공하지만, 간단한 작업이 아니라면 PPL 또는 AAL 을 사용하는 것을 권하고 있습니다.

 이러한 기능은 ScheduleGroup::ScheduleTask(), CurrentScheduler::ScheduleTask(), Scheduler::ScheduleTask() 를 호출하여 사용합니다.

 

ScheduleGroup::ScheduleTask( TaskProc _Proc, void* _Data );

 일정 그룹 내에 간단한 작업을 추가합니다.

 첫 번째 매개변수의 타입인 TaskProc 는 다음과 같이 정의되어 있습니다.

typedef void (__cdecl * TaskProc)(void *)

[ 코드1. TaskProc 의 정의 ]

 그렇기 때문에 위와 같은 모양( signature ) 를 갖는 함수 포인터를 인자로 넘기면 됩니다.

두 번째 매개변수는 위에서 인자로 넘긴 함수 포인터의 인자로 넘길 데이터입니다.

 

CurrentScheduleer::ScheduleTask( TaskProc _Proc, void* _Data );

 호출하는 컨텍스트와 연결된( attached ) 스케줄러에 간단한 작업을 추가합니다.

매개변수의 내용은 위의 ScheduleGroup::ScheduleTask() 와 같습니다.

만약 호출하는 컨텍스트에 연결된 스케줄러가 없을 경우에는 기본 스케줄러에 추가합니다.

 

Scheduler::ScheduleTask( TaskProc _Proc, void* _Data );

 해당 스케줄러에 간단한 작업을 추가합니다.

매개변수의 내용은 위의 ScheduleGroup::ScheduleTask() 와 같습니다.

 

예제

 위에 언급된 내용과 함수들을 어떻게 사용하는 알아보도록 하겠습니다.

 

시나리오

 새로운 스레드를 생성하고, 그 스레드에서 인자로 전달 받은 구조체의 내용을 출력하는 내용입니다.

 

코드

#include <iostream>
#include <concrt.h>

using namespace std;
using namespace Concurrency;

void __cdecl MyThreadFunction( void* pParam );

// 스레드에서 사용할 데이터
struct MyData
{
	int val1;
    int val2;
    event signal;
};

int main()
{
	MyData* pData = new MyData;

	if( nullptr == pData )
		return 1;
	
	pData->val1 = 50;
	pData->val2 = 100;
	
	// _beginthreadex() 처럼 스레드를 생성하고 인자로 넘어간 함수를 생성된 스레드에서 수행한다.
	CurrentScheduler::ScheduleTask( MyThreadFunction, pData );

	// 수행 중인 작업이 끝날 때까지 대기.
	pData->signal.wait();

	// 작업이 끝난 후, 자원 해제.
	delete pData;	

	return 0;
}

[ 코드2. CurrentScheduler::ScheduleTask() 를 사용하여 스레드를 생성하는 예제 ]

 간단한 구조체를 생성하고, 스레드에서 수행될 함수에 전달합니다.

 스레드에서는 구조체의 정보를 출력합니다.

 스레드의 수행이 종료되기 전에 구조체를 제거하면 안되므로, event 객체를 이용해 스레드가 종료될 때까지 기다린 후, 제거합니다.

 

마치는 글

 Concurrency Runtime 을 사용 중에 간단하게 스레드를 생성해야 한다면, 기존의 스레드 생성 함수를 사용하는 것보다 일관성 있게 ScheduleTask 를 사용하는 것이 좋습니다.

 하지만 이 글에서 언급했듯이 간단하지 않은 작업이라면 PPL 이나 AAL 을 사용하는 것이 좋습니다.

 다음 글에서는 Scheduler 에서 제공하는 Context 에 대해서 알아보도록 하겠습니다.

Concurrency Runtime
– Task Scheduler 3. ( ScheduleGroup )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 Scheduler 는 항상 하나 이상의 스케줄 그룹을 가지고 있습니다. 스케줄 그룹이 무엇인지 알아보고 동시성 프로그래밍에 어떤 영향을 주는지 알아보도록 하겠습니다.

 

SchedulerGroup

 SchedulerGroup 이란 Scheduler 가 스케줄링 해야 할 작업들을 모아 놓은 스케줄 그룹을 대표하는 클래스입니다.

 

정책

 이전 글에서 본 스케줄러 정책들 중 SchedulingProtocol 정책은 스케줄링 순서를 변경하여 작업들의 처리 순서를 변경할 수 있습니다. ScheduleringProtocol 정책이 EnhanseScheduleGroupLocality 로 설정된 경우에는 스케줄링 중인 스케줄 그룹을 변경하지 않고, 현재 스케줄링 중인 스케줄 그룹을 다른 스케줄 그룹보다 먼저 처리합니다. 반면에 SchedulingProtocol 정책이 EnhanseForwardProgress 로 설정된 경우에는 다른 스케줄링 그룹의 작업을 처리합니다. 스케줄 그룹 간에 작업을 공평하게 처리하게 됩니다.

 

생성

 CurrentScheduler::CreateScheduleGroup() 또는 Scheduler::CreateScheduleGroup() 을 통해 ScheduleGroup 객체를 생성할 수 있습니다.

 Scheduler 가 참조 개수를 사용하여 수명을 관리하는 것처럼 ScheduleGroup 객체 또한 참조 개수를 사용합니다. 생성되는 순간 참조 개수는 1이 됩니다. SchedulerGroup::Reference() 는 참조 개수는 1이 증가하고, SchedulerGroup::Release() 는 참조 개수가 1이 감소하게 됩니다.

 

동작 및 사용

 Scheduler 객체에는 기본적으로 기본 ScheduleGroup 을 가지고 있습니다. 명시적으로 ScheduleGroup 을 생성하지 않는다면 스케줄링 해야 할 작업들은 기본 ScheduleGroup 에 추가됩니다.

 ScheduleGroup 은 Concurrency Runtime 중 Asynchronous Agents Library( 이하, AAL ) 에서 사용할 수 있습니다. agent 클래스나 message block 들의 생성자로 ScheduleGroup 을 전달하여 사용할 수 있습니다.

 

예제

 ScheduleGroup 을 어떻게 사용하는지 이해를 도울 예제를 살펴보겠습니다.

 

시나리오

 단순하게 어떤 스케줄 그룹의 작업들이 어떤 순서로 처리되는지 알아보는 예제입니다.

 

코드

#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <sstream>

using namespace std;
using namespace Concurrency;

#pragma optimize( "", off )

void spin_loop()
{
	for( unsigned int i = 0; i < 500000000; ++i )
	{
	}
}

#pragma optimize( "", on )

class work_yield_agent
	: public agent
{
public:
	explicit work_yield_agent( unsigned int group_number, unsigned int task_number )
		: group_number( group_number )
		, task_number( task_number ) { }

	explicit work_yield_agent( ScheduleGroup& scheduleGroup, unsigned int group_number, unsigned int task_number )
		: agent( scheduleGroup )
		, group_number( group_number )
		, task_number( task_number ) { }

protected:
	void run()
	{
		wstringstream header, ss;

		header << L"agent no. " << this->group_number << L"-" << this->task_number << L": ";

		ss << header.str() << L"first loop..." << endl;
		wcout << ss.str();
		spin_loop();

		ss = wstringstream();
		ss << header.str() << L"waiting..." << endl;
		wcout << ss.str();
		Concurrency::wait( 0 );

		ss = wstringstream();
		ss<< header.str() << L"second loop..." << endl;
		wcout << ss.str();
		spin_loop();

		ss = wstringstream();
		ss << header.str() << L"finished..." << endl;
		wcout << ss.str();

		this->done();
	}

private:
	unsigned int	group_number;
	unsigned int	task_number;
};

void run_agents()
{
	const unsigned int group_count = 2;
	const unsigned int tasks_per_group = 2;

	vector< ScheduleGroup* > groups;
	vector< agent* > agents;

	for( unsigned int group_index = 0; group_index < group_count; ++group_index )
	{
		groups.push_back( CurrentScheduler::CreateScheduleGroup() );

		for( unsigned int task_index = 0; task_index < tasks_per_group; ++task_index )
		{
			agents.push_back( new work_yield_agent( *groups.back(), group_index, task_index ) );
		}
	}

	for_each( agents.begin(), agents.end(), []( agent* pAgent )
	{
		pAgent->start();
	} );

	agent::wait_for_all( agents.size(), &agents[0] );

	for_each( agents.begin(), agents.end(), []( agent* pAgent )
	{
		delete pAgent;
	} );

	for_each( groups.begin(), groups.end(), []( ScheduleGroup* pScheduleGroup )
	{
		pScheduleGroup->Release();
	} );
}

int main()
{
	wcout << L"Using EnhanceScheduleGroupLocality..." << endl;
	CurrentScheduler::Create( SchedulerPolicy( 3,
		MinConcurrency, 1,
		MaxConcurrency, 2,
		SchedulingProtocol, EnhanceScheduleGroupLocality ) );

	run_agents();
	CurrentScheduler::Detach();

	wcout << endl << endl;

	wcout << L"Using EnhanceForwardProgress..." << endl;
	CurrentScheduler::Create( SchedulerPolicy( 3,
		MinConcurrency, 1, 
		MaxConcurrency, 2,
		SchedulingProtocol, EnhanceForwardProgress ) );

	run_agents();
	CurrentScheduler::Detach();
}

[ 코드1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 ]

 최대 동시성 리소스( computing core ) 의 개수를 2개로 하고, 한 번은 SchedulingProtocol 정책을 EnhanceScheduleGroupLocality 로 하고, 한 번은 EnhanceForwardProgress 으로 하여 작업의 순서를 알아보았습니다.

 2개의 스케줄 그룹을 생성하고, 하나의 그룹에 2개의 작업을 추가하여, 총 4개의 작업이 수행됩니다.

 예제의 spin_loop() 는 어떤 작업을 처리하는 것을 의미하고 wait() 는 양보를 뜻합니다. wait() 가 호출되면 다른 작업이 스케줄링 되는데 이 때 결정되는 작업이 어떤 작업인지는 앞서 설정한 SchedulingProtocol 정책에 따릅니다.

 EnhanceScheduleGroupLocality 로 설정된 경우에는 같은 스케줄 그룹 내의 작업이 처리되는 반면에, EnhanceForwardProgress 로 설정된 경우에는 다른 스케줄 그룹의 작업이 처리되게 됩니다.

 

[ 그림1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 실행 결과 ]

[ 그림1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 실행 결과 ]

 

마치는 글

 이렇게 스케줄링 정책과 스케줄 그룹을 통해서 스케줄링 순서를 결정하는데 관여할 수 있다는 것을 알아보았습니다.

다음 글에서는 스레드를 직접 생성하여 사용할 수 있는 ScheduleTask 라는 것에 대해서 알아보도록 하겠습니다.

Concurrency Runtime
– Task Scheduler 2. ( SchedulerPolicy )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 지난 글에서 Scheduler 를 통해서 스케줄링 방법을 설정할 수 있다고 설명 드렸습니다. 스케줄링 방법을 제어하기 위한 기반 정보를 스케줄러 정책이라고 표현하고, 우리는 이 스케줄링 정책을 사용해서 스케줄링을 제어할 수 있습니다.

스케줄러 정책을 대표하는 클래스가 바로 오늘 설명드릴 SchedulerPolicy 입니다.

 

SchedulerPolicy

 스케줄러에게 스케줄링에 필요한 기반 정보를 제공하는 클래스입니다. Scheduler 객체를 생성할 때, 이 SchedulerPolicy 를 지정할 수 있습니다. Scheduler 객체가 생성된 후에는 SchedulerPolicy 를 변경할 수 없습니다.

 

정책 생성과 설정

 스케줄러 정책을 설정하기 위해서는 SchedulerPolicy 객체를 생성할 때, 설정할 정책들을 생성자의 매개변수로 설정해야 합니다.

_CRTIMP SchedulerPolicy();
_CRTIMP SchedulerPolicy(size_t _PolicyKeyCount, ...);
_CRTIMP SchedulerPolicy(const SchedulerPolicy& _SrcPolicy);

[ 코드1. SchedulerPolicy 의 생성자 ]

 기본 생성자와 복사 생성자를 제공하고, 정책을 설정할 수 있는 생성자를 제공하고 있습니다.

 기본 생성자로 생성할 경우, 기본 설정으로 정책을 생성합니다.

 정책을 설정할 수 있는 생성자는 첫 번째 인수로 설정할 정책의 수를 입력하고, 그 뒤에 입력한 정책의 수 만큼 정책의 키( key ) 와 값( value ) 을 입력해야 합니다.

SchedulerPolicy policy(3,       
   MinConcurrency, 2,
   MaxConcurrency, 4,
   ContextPriority, THREAD_PRIORITY_HIGHEST
);

[ 코드2. SchedulerPolicy 객체의 생성 ]

 위와 같이 생성한 SchedulerPolicy 객체는 3개의 정책을 설정하는데, 사용할 최소 동시성 자원( computing core ) 의 개수는 2개, 사용할 최대 동시성 자원의 개수는 4개, 현재 컨텍스트의 스레드 우선순위를 최고 순위로 설정하게 됩니다.

 위 코드에서 보여드린 정책들 뿐만 아니라 여러 가지 정책들을 설정할 수 있습니다.

 

설정할 수 있는 정책들

 PolicyElementKey 열거형으로 설정할 수 있는 정책들을 정의하고 있습니다. 그 내용은 다음과 같습니다.

 정책 키  설명  기본 값
 SchedulerKind  작업들을 수행할 때, 일반 스레드를 사용할지, UMS 스레드를 사용할지 설정  ThreadScheduler
 MaxConcurrency  스케줄러에서 사용할 최대 동시성 자원 수  MaxExecutionResources
 MinConcurrency  스케줄러에서 사용할 최소 동시성 자원 수  1
 TargetOversubscriptionFactor  작업을 수행 시, 자원에 할당할 스레드의 수  1
 LocalContextCacheSize  캐시할 수 있는 컨텍스트 수  8
 ContextStackSize  각 컨텍스트에서 사용할 스택 크기( KB )  0( 기본 스택 크기 사용 )
 ContextPriority  각 컨텍스트의 스레드 우선 순위  THREAD_PRIORITY_NORMAL
 SchedulingProtocal  스케줄 그룹의 작업 예약 알고리즘  EnhanceScheduleGroupLocality
 DynamicProgressFeedback  자원 통계 정책, 사용 금지.  ProgressFeedbackEnabled

[ 표1. PolicyElementKey 열거형에 정의된 설정할 수 있는 정책들 ]

기본 정책 설정

 Asynchronous Agents Library( 이하, AAL ) 에는 우리가 생성한 정책을 지정한 Scheduler 객체를 사용할 수 있지만, Parallel Patterns Library ( 이하, PPL ) 에는 우리가 생성한 Scheduler 객체를 사용할 수 없고, 내부적으로 생성되는 기본 스케줄러를 사용하게 됩니다. 기본 스케줄러는 기본 정책을 사용하게 되는데, 이 기본 정책을 미리 설정해 둘 수 있습니다.

_CRTIMP static void __cdecl SetDefaultSchedulerPolicy(
   const SchedulerPolicy& _Policy
);

[ 코드3. SetDefaultSchedulerPolicy() 의 선언 ]

 SetDefaultSchedulerPolicy() 의 인자로 SchedulerPolicy 를 생성하여 설정하면, 직접 Scheduler 를 생성하여 사용하지 않더라도, 내부에서 생성되는 기본 스케줄러에도 정책을 설정할 수 있습니다.

 

스케줄러 정책 획득

 이미 생성된 Scheduler 객체로부터 설정된 정책을 가져올 수 있습니다.

_CRTIMP static SchedulerPolicy __cdecl GetPolicy();	// CurrentScheduler::GetPolicy() static function
virtual SchedulerPolicy GetPolicy(); 			// Scheduler::GetPolicy() member function

[ 코드4. GetPolicy() 들의 선언 ]

 Scheduler 객체로부터 얻어온 정책은 설정할 때의 정책과 다를 수 있습니다. 정책을 설정하더라도, 해당 시스템에서 설정 가능한 정책만 적용되고, 그렇지 않은 경우에는 기본 정책 값이나 Resource Manager 에 의해 적당한 값으로 적용됩니다.

예를 들어 UMS 를 사용할 수 없는 시스템에서 UMS 를 사용하라고 설정하더라도, ThreadScheduler로 설정됩니다.

 

예제

 Scheduler 를 사용하는 방법과 SchedulerPolicy 인해 어떤 영향을 받는지 알아보는 예제를 보도록 하겠습니다.

 

시나리오

 문자열 순열을 구하는 프로그램입니다. 문자열 순열이란 문자열을 이루는 알파벳들로 만들 수 있는 모든 경우의 수를 말합니다.

문자열 순열을 구하는 작업은 오랜 시간이 걸리므로 agent 를 이용해 비 동기 처리를 합니다. 그리고  문자열 순열을 구하는 작업을 하는 agent 와 통신을 하며 진행 상황을 출력해주는 agent 가 비 동기로 처리됩니다.

 이 때, Concurrency Runtime 은 협조적 스케줄링을 하기 때문에 바쁜 스케줄러에 더 많은 자원을 할당합니다. 반대로 바쁘지 않은 스케줄러에는 적은 자원이 할당됩니다. 이로 인해 진행 상황을 출력하는 agent 가 제대로 스케줄링되지 않는 현상이 발생하게 됩니다.

 이의 해결책으로 진행 상황을 출력하는 agent 의 스레드 우선 순위를 높게 설정합니다.

 

코드

#include <Windows.h>
#include <ppl.h>
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace std;
using namespace Concurrency;

// 문자열 순열을 구하는 agent
class permutor
	: public agent
{
public:
	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }	

protected:
	void run()
	{
		wstring s = receive( this->source );

		this->permute( s );

		this->done();
	}

	unsigned int factorial( unsigned int n )
	{
		if( 0 == n )
			return 0;

		if( 1 == n )
			return 1;

		return n * this->factorial( n - 1 );
	}

	wstring permutation( int n, const wstring& s )
	{
		wstring t( s );

		size_t len = t.length();

		for( unsigned int i = 2; i <  len; ++i )
		{
			swap( t[ n % i ], t[i] );
			n = n / i;
		}

		return t;
	}

	void permute( const wstring& s )
	{
		unsigned int permutation_count = this->factorial( s.length() );

		long count = 0;

		unsigned int previous_percent = 0u;

		send( this->progress, previous_percent );

		parallel_for( 0u, permutation_count, [&]( unsigned int i )
		{
			this->permutation( i, s );

			unsigned int percent = 100 * InterlockedIncrement( &count ) / permutation_count;
			if( percent > previous_percent )
			{
				send( this->progress, percent );
				previous_percent = percent;
			}
		} );

		send( this->progress, 100u );
	}

private:
	ISource< wstring >&			source;
	ITarget< unsigned int >&	progress;
};

// 진행 상황을 출력하는 agent
class printer
	: public agent
{
public:
	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }

protected:
	void run()
	{
		wstringstream ss;
		ss << L"Computing all permutations of '" << receive( this->source ) << L"'..." << endl;
		wcout << ss.str();

		unsigned int previous_percent = 0u;

		while( true )
		{
			unsigned int percent = receive( this->progress );

			if( percent > previous_percent || percent == 0u )
			{
				wstringstream ss;
				ss << L'\r' << percent << L"% complete...";
				wcout << ss.str();
				previous_percent = percent;
			}

			if( 100 == percent )
				break;
		}

		wcout << endl;

		this->done();
	}

private:
	ISource< wstring >&			source;
	ISource< unsigned int >&	progress;
};

// agent 의 작업을 관리하는 함수
void permute_string( const wstring& source, Scheduler& permutor_scheduler, Scheduler& printer_scheduler )
{
	single_assignment< wstring > source_string;
	unbounded_buffer< unsigned int > progress;

	permutor agent1( source_string, progress, permutor_scheduler );
	printer agent2( source_string, progress, printer_scheduler );

	agent1.start();
	agent2.start();

	send( source_string, source );

	agent::wait( &agent1 );
	agent::wait( &agent2 );
}

int main()
{
	const wstring source( L"Grapefruit" );

	// 기본 정책으로 작업을 수행
	Scheduler* pDefault_scheduler = CurrentScheduler::Get();

	wcout << L"With default scheduler: " << endl;
	permute_string( source, *pDefault_scheduler, *pDefault_scheduler );
	wcout << endl;

	// 진행 상황을 출력하는 agent 에 필요한 스레드 우선 순위를 높게 하는 정책을 설정하여 적용
	SchedulerPolicy printer_policy( 1, ContextPriority, THREAD_PRIORITY_HIGHEST );
	Scheduler* pPrinter_scheduler = Scheduler::Create( printer_policy );

	HANDLE hShutdownEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	pPrinter_scheduler->RegisterShutdownEvent( hShutdownEvent );

	wcout << L"With higher context priority: " << endl;
	permute_string( source, *pDefault_scheduler, *pPrinter_scheduler );
	wcout << endl;

	pPrinter_scheduler->Release();

	WaitForSingleObject( hShutdownEvent, INFINITE );
	CloseHandle( hShutdownEvent );
}

[ 코드5. Scheduler 객체에 스레드 우선 순위를 높인 SchedulerPolicy 객체를 적용한 예제 ]

 기본 정책으로 수행했을 때에는 작업 진행 상황이 제대로 출력되지 않는 반면에, 스레드 우선 순위를 높게 설정한 경우에는 제대로 출력되는 것을 보실 수 있습니다.

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

 

마치는 글

 이번 글에서는 Scheduler 의 기본적인 기능 중 하나인 SchedulerPolicy 를 설정하는 방법을 알아보았습니다. SchedulerPolicy 를 이용하여 기본 정책으로 해결되지 않는 다양한 문제점들을 해결 하실 수 있을 것입니다.

Concurrency Runtime – Task Scheduler 1. ( Scheduler )

VC++ 10 Concurrency Runtime 2010. 9. 2. 08:30 Posted by 알 수 없는 사용자

Concurrency Runtime
– Task Scheduler 1. ( Scheduler )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이번 글은 Parallel Patterns Library( 이하 PPL ) 과 Asynchronous Agents Library( 이하 AAL ) 내부에서 스케줄링을 하는 Scheduler 에 대해서 알아보도록 하겠습니다.

 

Scheduler class

 Scheduler 클래스는 Concurrency Runtime 에서 실제로 스케줄링을 하는 객체입니다. 우리는 Scheduler 객체를 사용해서 스케줄링의 방법을 설정할 수 있습니다.

 Scheduler 는 내부적으로 작업들을 그룹화한 ScheduleGroup 을 관리합니다. 또한 요청된 작업을 수행하는 Context 객체에 접근할 수 있도록 하여, 좀 더 구체적인 스케줄링을 할 수 있도록 도와줍니다.

 

Scheduler 생성

 우리가 직접 Scheduler 를 생성하지 않아도, Concurrency Runtime 내부에서 기본 Scheduler 가 생성되어 스케줄링을 하게 됩니다. 이 경우에는 스케줄링 정책을 바꿀 수는 있으나, 세밀하게 제어할 수 없습니다.

 기본 Scheduler 외에 직접 우리가 Scheduler 를 생성하는 방법은 2 가지가 있습니다.

  • CurrentScheduler::Create() 는 현재 컨텍스트와 연결하는 Scheduler 를 만듭니다.
  • Scheduler::Create() 는 현재 컨텍스트와 연결되지 않는 Scheduler 를 만듭니다.

 Scheduler 는 내부적으로 참조 개수( reference counting ) 을 사용하여, 수명을 관리합니다. 그래서 참조 개수가 0이 되면 Scheduler 가 소멸됩니다.

 

Scheduler::Create()

 현재 컨텍스트와 연결되지 않은 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.

 

Scheduler::Attach()

 현재 컨텍스트와 Scheduler 를 연결합니다. 참조 개수가 증가합니다.

 

Scheduler::Reference()

 참조 개수가 증가합니다.

 

Scheduler::Release()

 참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.

 

CurrentScheduler::Create()

 현재 컨텍스트와 연결된 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.

 

CurrentScheduler::Detach()

 현재 컨텍스트를 분리합니다. 참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.

 

생성과 소멸, 연결과 분리

 위와 같은 함수들을 제공하지만, 생성과 소멸, 연결과 분리가 짝을 이루어야 합니다.

 CurrentScheduler::Create() 로 생성하였다면, CurrentScheduler::Detach() 로 소멸시키는 것이 좋습니다.

 Scheduler::Create() 로 생성하고, Scheduler::Attach() 로 연결하였다면, Scheduler::Detach() 로 해제하고, Scheduler::Release() 로 소멸해야 합니다.

 만약 Scheduler::Reference() 를 통해 참조 개수를 증가시켰다면, Scheduler::Release() 를 사용하여 참조 개수를 감소시켜주어야 합니다.

 

소멸 시점 알림

 모든 작업이 끝나기 전에는 Scheduler 를 소멸시키지 않습니다. 언제 Scheduler 가 소멸되는지 알기 위해서는 RegisterShutdownEvent() 를 사용하여 Windows API 의 EVENT 객체를 지정해 주고, WaitForSingleObject() 를 사용하여 소멸될 때까지 대기할 수 있습니다.

 

그 외의 멤버 함수

 위에서 설명한 멤버 함수 이외에 제공하는 멤버 함수들을 알아보도록 하겠습니다.

 

CurrentScheduler

  • Get() – 현재 컨텍스트에 연결된 Scheduler 의 포인터를 반환합니다. 참조 개수가 증가하지 않습니다.
  • CreateScheduleGroup() -  ScheduleGroup 을 생성합니다.
  • ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
  • GetPolicy() – 연결된 정책의 복사본을 반환합니다.

 

Scheduler

  • CreateScheduleGroup() – ScheduleGroup 을 생성합니다.
  • ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
  • GetPolicy() – 연결된 정책의 복사본을 반환합니다.
  • SetDefaultSchedulePolicy() – 기본 Scheduler 에 적용될 정책을 설정합니다.
  • ResetDefaultSchedulePolicy() – 기본 Scheduler 의 정책을 SetDefaultSchedulerPolicy() 를 사용하기 전의 정책으로 설정합니다.

 

마치는 글

 이번 글에서는 Concurrency Runtime 의 Scheduler 에 대해서 알아보았습니다. 위의 설명만으로는 어떻게 사용해야 하는지, 어떤 기능을 하는지 알기 어렵습니다.

다음 글에서 위에서 소개해드린 멤버 함수들의 사용 방법과 활용 예제들에 대해서 알아보도록 하겠습니다.