Concurrency Runtime
– Task Scheduler 2. ( SchedulerPolicy )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 지난 글에서 Scheduler 를 통해서 스케줄링 방법을 설정할 수 있다고 설명 드렸습니다. 스케줄링 방법을 제어하기 위한 기반 정보를 스케줄러 정책이라고 표현하고, 우리는 이 스케줄링 정책을 사용해서 스케줄링을 제어할 수 있습니다.

스케줄러 정책을 대표하는 클래스가 바로 오늘 설명드릴 SchedulerPolicy 입니다.

 

SchedulerPolicy

 스케줄러에게 스케줄링에 필요한 기반 정보를 제공하는 클래스입니다. Scheduler 객체를 생성할 때, 이 SchedulerPolicy 를 지정할 수 있습니다. Scheduler 객체가 생성된 후에는 SchedulerPolicy 를 변경할 수 없습니다.

 

정책 생성과 설정

 스케줄러 정책을 설정하기 위해서는 SchedulerPolicy 객체를 생성할 때, 설정할 정책들을 생성자의 매개변수로 설정해야 합니다.

_CRTIMP SchedulerPolicy();
_CRTIMP SchedulerPolicy(size_t _PolicyKeyCount, ...);
_CRTIMP SchedulerPolicy(const SchedulerPolicy& _SrcPolicy);

[ 코드1. SchedulerPolicy 의 생성자 ]

 기본 생성자와 복사 생성자를 제공하고, 정책을 설정할 수 있는 생성자를 제공하고 있습니다.

 기본 생성자로 생성할 경우, 기본 설정으로 정책을 생성합니다.

 정책을 설정할 수 있는 생성자는 첫 번째 인수로 설정할 정책의 수를 입력하고, 그 뒤에 입력한 정책의 수 만큼 정책의 키( key ) 와 값( value ) 을 입력해야 합니다.

SchedulerPolicy policy(3,       
   MinConcurrency, 2,
   MaxConcurrency, 4,
   ContextPriority, THREAD_PRIORITY_HIGHEST
);

[ 코드2. SchedulerPolicy 객체의 생성 ]

 위와 같이 생성한 SchedulerPolicy 객체는 3개의 정책을 설정하는데, 사용할 최소 동시성 자원( computing core ) 의 개수는 2개, 사용할 최대 동시성 자원의 개수는 4개, 현재 컨텍스트의 스레드 우선순위를 최고 순위로 설정하게 됩니다.

 위 코드에서 보여드린 정책들 뿐만 아니라 여러 가지 정책들을 설정할 수 있습니다.

 

설정할 수 있는 정책들

 PolicyElementKey 열거형으로 설정할 수 있는 정책들을 정의하고 있습니다. 그 내용은 다음과 같습니다.

 정책 키  설명  기본 값
 SchedulerKind  작업들을 수행할 때, 일반 스레드를 사용할지, UMS 스레드를 사용할지 설정  ThreadScheduler
 MaxConcurrency  스케줄러에서 사용할 최대 동시성 자원 수  MaxExecutionResources
 MinConcurrency  스케줄러에서 사용할 최소 동시성 자원 수  1
 TargetOversubscriptionFactor  작업을 수행 시, 자원에 할당할 스레드의 수  1
 LocalContextCacheSize  캐시할 수 있는 컨텍스트 수  8
 ContextStackSize  각 컨텍스트에서 사용할 스택 크기( KB )  0( 기본 스택 크기 사용 )
 ContextPriority  각 컨텍스트의 스레드 우선 순위  THREAD_PRIORITY_NORMAL
 SchedulingProtocal  스케줄 그룹의 작업 예약 알고리즘  EnhanceScheduleGroupLocality
 DynamicProgressFeedback  자원 통계 정책, 사용 금지.  ProgressFeedbackEnabled

[ 표1. PolicyElementKey 열거형에 정의된 설정할 수 있는 정책들 ]

기본 정책 설정

 Asynchronous Agents Library( 이하, AAL ) 에는 우리가 생성한 정책을 지정한 Scheduler 객체를 사용할 수 있지만, Parallel Patterns Library ( 이하, PPL ) 에는 우리가 생성한 Scheduler 객체를 사용할 수 없고, 내부적으로 생성되는 기본 스케줄러를 사용하게 됩니다. 기본 스케줄러는 기본 정책을 사용하게 되는데, 이 기본 정책을 미리 설정해 둘 수 있습니다.

_CRTIMP static void __cdecl SetDefaultSchedulerPolicy(
   const SchedulerPolicy& _Policy
);

[ 코드3. SetDefaultSchedulerPolicy() 의 선언 ]

 SetDefaultSchedulerPolicy() 의 인자로 SchedulerPolicy 를 생성하여 설정하면, 직접 Scheduler 를 생성하여 사용하지 않더라도, 내부에서 생성되는 기본 스케줄러에도 정책을 설정할 수 있습니다.

 

스케줄러 정책 획득

 이미 생성된 Scheduler 객체로부터 설정된 정책을 가져올 수 있습니다.

_CRTIMP static SchedulerPolicy __cdecl GetPolicy();	// CurrentScheduler::GetPolicy() static function
virtual SchedulerPolicy GetPolicy(); 			// Scheduler::GetPolicy() member function

[ 코드4. GetPolicy() 들의 선언 ]

 Scheduler 객체로부터 얻어온 정책은 설정할 때의 정책과 다를 수 있습니다. 정책을 설정하더라도, 해당 시스템에서 설정 가능한 정책만 적용되고, 그렇지 않은 경우에는 기본 정책 값이나 Resource Manager 에 의해 적당한 값으로 적용됩니다.

예를 들어 UMS 를 사용할 수 없는 시스템에서 UMS 를 사용하라고 설정하더라도, ThreadScheduler로 설정됩니다.

 

예제

 Scheduler 를 사용하는 방법과 SchedulerPolicy 인해 어떤 영향을 받는지 알아보는 예제를 보도록 하겠습니다.

 

시나리오

 문자열 순열을 구하는 프로그램입니다. 문자열 순열이란 문자열을 이루는 알파벳들로 만들 수 있는 모든 경우의 수를 말합니다.

문자열 순열을 구하는 작업은 오랜 시간이 걸리므로 agent 를 이용해 비 동기 처리를 합니다. 그리고  문자열 순열을 구하는 작업을 하는 agent 와 통신을 하며 진행 상황을 출력해주는 agent 가 비 동기로 처리됩니다.

 이 때, Concurrency Runtime 은 협조적 스케줄링을 하기 때문에 바쁜 스케줄러에 더 많은 자원을 할당합니다. 반대로 바쁘지 않은 스케줄러에는 적은 자원이 할당됩니다. 이로 인해 진행 상황을 출력하는 agent 가 제대로 스케줄링되지 않는 현상이 발생하게 됩니다.

 이의 해결책으로 진행 상황을 출력하는 agent 의 스레드 우선 순위를 높게 설정합니다.

 

코드

#include <Windows.h>
#include <ppl.h>
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace std;
using namespace Concurrency;

// 문자열 순열을 구하는 agent
class permutor
	: public agent
{
public:
	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }	

protected:
	void run()
	{
		wstring s = receive( this->source );

		this->permute( s );

		this->done();
	}

	unsigned int factorial( unsigned int n )
	{
		if( 0 == n )
			return 0;

		if( 1 == n )
			return 1;

		return n * this->factorial( n - 1 );
	}

	wstring permutation( int n, const wstring& s )
	{
		wstring t( s );

		size_t len = t.length();

		for( unsigned int i = 2; i <  len; ++i )
		{
			swap( t[ n % i ], t[i] );
			n = n / i;
		}

		return t;
	}

	void permute( const wstring& s )
	{
		unsigned int permutation_count = this->factorial( s.length() );

		long count = 0;

		unsigned int previous_percent = 0u;

		send( this->progress, previous_percent );

		parallel_for( 0u, permutation_count, [&]( unsigned int i )
		{
			this->permutation( i, s );

			unsigned int percent = 100 * InterlockedIncrement( &count ) / permutation_count;
			if( percent > previous_percent )
			{
				send( this->progress, percent );
				previous_percent = percent;
			}
		} );

		send( this->progress, 100u );
	}

private:
	ISource< wstring >&			source;
	ITarget< unsigned int >&	progress;
};

// 진행 상황을 출력하는 agent
class printer
	: public agent
{
public:
	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }

protected:
	void run()
	{
		wstringstream ss;
		ss << L"Computing all permutations of '" << receive( this->source ) << L"'..." << endl;
		wcout << ss.str();

		unsigned int previous_percent = 0u;

		while( true )
		{
			unsigned int percent = receive( this->progress );

			if( percent > previous_percent || percent == 0u )
			{
				wstringstream ss;
				ss << L'\r' << percent << L"% complete...";
				wcout << ss.str();
				previous_percent = percent;
			}

			if( 100 == percent )
				break;
		}

		wcout << endl;

		this->done();
	}

private:
	ISource< wstring >&			source;
	ISource< unsigned int >&	progress;
};

// agent 의 작업을 관리하는 함수
void permute_string( const wstring& source, Scheduler& permutor_scheduler, Scheduler& printer_scheduler )
{
	single_assignment< wstring > source_string;
	unbounded_buffer< unsigned int > progress;

	permutor agent1( source_string, progress, permutor_scheduler );
	printer agent2( source_string, progress, printer_scheduler );

	agent1.start();
	agent2.start();

	send( source_string, source );

	agent::wait( &agent1 );
	agent::wait( &agent2 );
}

int main()
{
	const wstring source( L"Grapefruit" );

	// 기본 정책으로 작업을 수행
	Scheduler* pDefault_scheduler = CurrentScheduler::Get();

	wcout << L"With default scheduler: " << endl;
	permute_string( source, *pDefault_scheduler, *pDefault_scheduler );
	wcout << endl;

	// 진행 상황을 출력하는 agent 에 필요한 스레드 우선 순위를 높게 하는 정책을 설정하여 적용
	SchedulerPolicy printer_policy( 1, ContextPriority, THREAD_PRIORITY_HIGHEST );
	Scheduler* pPrinter_scheduler = Scheduler::Create( printer_policy );

	HANDLE hShutdownEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	pPrinter_scheduler->RegisterShutdownEvent( hShutdownEvent );

	wcout << L"With higher context priority: " << endl;
	permute_string( source, *pDefault_scheduler, *pPrinter_scheduler );
	wcout << endl;

	pPrinter_scheduler->Release();

	WaitForSingleObject( hShutdownEvent, INFINITE );
	CloseHandle( hShutdownEvent );
}

[ 코드5. Scheduler 객체에 스레드 우선 순위를 높인 SchedulerPolicy 객체를 적용한 예제 ]

 기본 정책으로 수행했을 때에는 작업 진행 상황이 제대로 출력되지 않는 반면에, 스레드 우선 순위를 높게 설정한 경우에는 제대로 출력되는 것을 보실 수 있습니다.

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

 

마치는 글

 이번 글에서는 Scheduler 의 기본적인 기능 중 하나인 SchedulerPolicy 를 설정하는 방법을 알아보았습니다. SchedulerPolicy 를 이용하여 기본 정책으로 해결되지 않는 다양한 문제점들을 해결 하실 수 있을 것입니다.