Concurrency Runtime
– Task Scheduler 3. ( ScheduleGroup )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 Scheduler 는 항상 하나 이상의 스케줄 그룹을 가지고 있습니다. 스케줄 그룹이 무엇인지 알아보고 동시성 프로그래밍에 어떤 영향을 주는지 알아보도록 하겠습니다.

 

SchedulerGroup

 SchedulerGroup 이란 Scheduler 가 스케줄링 해야 할 작업들을 모아 놓은 스케줄 그룹을 대표하는 클래스입니다.

 

정책

 이전 글에서 본 스케줄러 정책들 중 SchedulingProtocol 정책은 스케줄링 순서를 변경하여 작업들의 처리 순서를 변경할 수 있습니다. ScheduleringProtocol 정책이 EnhanseScheduleGroupLocality 로 설정된 경우에는 스케줄링 중인 스케줄 그룹을 변경하지 않고, 현재 스케줄링 중인 스케줄 그룹을 다른 스케줄 그룹보다 먼저 처리합니다. 반면에 SchedulingProtocol 정책이 EnhanseForwardProgress 로 설정된 경우에는 다른 스케줄링 그룹의 작업을 처리합니다. 스케줄 그룹 간에 작업을 공평하게 처리하게 됩니다.

 

생성

 CurrentScheduler::CreateScheduleGroup() 또는 Scheduler::CreateScheduleGroup() 을 통해 ScheduleGroup 객체를 생성할 수 있습니다.

 Scheduler 가 참조 개수를 사용하여 수명을 관리하는 것처럼 ScheduleGroup 객체 또한 참조 개수를 사용합니다. 생성되는 순간 참조 개수는 1이 됩니다. SchedulerGroup::Reference() 는 참조 개수는 1이 증가하고, SchedulerGroup::Release() 는 참조 개수가 1이 감소하게 됩니다.

 

동작 및 사용

 Scheduler 객체에는 기본적으로 기본 ScheduleGroup 을 가지고 있습니다. 명시적으로 ScheduleGroup 을 생성하지 않는다면 스케줄링 해야 할 작업들은 기본 ScheduleGroup 에 추가됩니다.

 ScheduleGroup 은 Concurrency Runtime 중 Asynchronous Agents Library( 이하, AAL ) 에서 사용할 수 있습니다. agent 클래스나 message block 들의 생성자로 ScheduleGroup 을 전달하여 사용할 수 있습니다.

 

예제

 ScheduleGroup 을 어떻게 사용하는지 이해를 도울 예제를 살펴보겠습니다.

 

시나리오

 단순하게 어떤 스케줄 그룹의 작업들이 어떤 순서로 처리되는지 알아보는 예제입니다.

 

코드

#include <agents.h>
#include <vector>
#include <algorithm>
#include <iostream>
#include <sstream>

using namespace std;
using namespace Concurrency;

#pragma optimize( "", off )

void spin_loop()
{
	for( unsigned int i = 0; i < 500000000; ++i )
	{
	}
}

#pragma optimize( "", on )

class work_yield_agent
	: public agent
{
public:
	explicit work_yield_agent( unsigned int group_number, unsigned int task_number )
		: group_number( group_number )
		, task_number( task_number ) { }

	explicit work_yield_agent( ScheduleGroup& scheduleGroup, unsigned int group_number, unsigned int task_number )
		: agent( scheduleGroup )
		, group_number( group_number )
		, task_number( task_number ) { }

protected:
	void run()
	{
		wstringstream header, ss;

		header << L"agent no. " << this->group_number << L"-" << this->task_number << L": ";

		ss << header.str() << L"first loop..." << endl;
		wcout << ss.str();
		spin_loop();

		ss = wstringstream();
		ss << header.str() << L"waiting..." << endl;
		wcout << ss.str();
		Concurrency::wait( 0 );

		ss = wstringstream();
		ss<< header.str() << L"second loop..." << endl;
		wcout << ss.str();
		spin_loop();

		ss = wstringstream();
		ss << header.str() << L"finished..." << endl;
		wcout << ss.str();

		this->done();
	}

private:
	unsigned int	group_number;
	unsigned int	task_number;
};

void run_agents()
{
	const unsigned int group_count = 2;
	const unsigned int tasks_per_group = 2;

	vector< ScheduleGroup* > groups;
	vector< agent* > agents;

	for( unsigned int group_index = 0; group_index < group_count; ++group_index )
	{
		groups.push_back( CurrentScheduler::CreateScheduleGroup() );

		for( unsigned int task_index = 0; task_index < tasks_per_group; ++task_index )
		{
			agents.push_back( new work_yield_agent( *groups.back(), group_index, task_index ) );
		}
	}

	for_each( agents.begin(), agents.end(), []( agent* pAgent )
	{
		pAgent->start();
	} );

	agent::wait_for_all( agents.size(), &agents[0] );

	for_each( agents.begin(), agents.end(), []( agent* pAgent )
	{
		delete pAgent;
	} );

	for_each( groups.begin(), groups.end(), []( ScheduleGroup* pScheduleGroup )
	{
		pScheduleGroup->Release();
	} );
}

int main()
{
	wcout << L"Using EnhanceScheduleGroupLocality..." << endl;
	CurrentScheduler::Create( SchedulerPolicy( 3,
		MinConcurrency, 1,
		MaxConcurrency, 2,
		SchedulingProtocol, EnhanceScheduleGroupLocality ) );

	run_agents();
	CurrentScheduler::Detach();

	wcout << endl << endl;

	wcout << L"Using EnhanceForwardProgress..." << endl;
	CurrentScheduler::Create( SchedulerPolicy( 3,
		MinConcurrency, 1, 
		MaxConcurrency, 2,
		SchedulingProtocol, EnhanceForwardProgress ) );

	run_agents();
	CurrentScheduler::Detach();
}

[ 코드1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 ]

 최대 동시성 리소스( computing core ) 의 개수를 2개로 하고, 한 번은 SchedulingProtocol 정책을 EnhanceScheduleGroupLocality 로 하고, 한 번은 EnhanceForwardProgress 으로 하여 작업의 순서를 알아보았습니다.

 2개의 스케줄 그룹을 생성하고, 하나의 그룹에 2개의 작업을 추가하여, 총 4개의 작업이 수행됩니다.

 예제의 spin_loop() 는 어떤 작업을 처리하는 것을 의미하고 wait() 는 양보를 뜻합니다. wait() 가 호출되면 다른 작업이 스케줄링 되는데 이 때 결정되는 작업이 어떤 작업인지는 앞서 설정한 SchedulingProtocol 정책에 따릅니다.

 EnhanceScheduleGroupLocality 로 설정된 경우에는 같은 스케줄 그룹 내의 작업이 처리되는 반면에, EnhanceForwardProgress 로 설정된 경우에는 다른 스케줄 그룹의 작업이 처리되게 됩니다.

 

[ 그림1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 실행 결과 ]

[ 그림1. 처리되는 스케줄 그룹의 순서를 확인하는 예제 실행 결과 ]

 

마치는 글

 이렇게 스케줄링 정책과 스케줄 그룹을 통해서 스케줄링 순서를 결정하는데 관여할 수 있다는 것을 알아보았습니다.

다음 글에서는 스레드를 직접 생성하여 사용할 수 있는 ScheduleTask 라는 것에 대해서 알아보도록 하겠습니다.