Concurrency Runtime
– Task Scheduler 2. ( SchedulerPolicy )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 지난 글에서 Scheduler 를 통해서 스케줄링 방법을 설정할 수 있다고 설명 드렸습니다. 스케줄링 방법을 제어하기 위한 기반 정보를 스케줄러 정책이라고 표현하고, 우리는 이 스케줄링 정책을 사용해서 스케줄링을 제어할 수 있습니다.

스케줄러 정책을 대표하는 클래스가 바로 오늘 설명드릴 SchedulerPolicy 입니다.

 

SchedulerPolicy

 스케줄러에게 스케줄링에 필요한 기반 정보를 제공하는 클래스입니다. Scheduler 객체를 생성할 때, 이 SchedulerPolicy 를 지정할 수 있습니다. Scheduler 객체가 생성된 후에는 SchedulerPolicy 를 변경할 수 없습니다.

 

정책 생성과 설정

 스케줄러 정책을 설정하기 위해서는 SchedulerPolicy 객체를 생성할 때, 설정할 정책들을 생성자의 매개변수로 설정해야 합니다.

_CRTIMP SchedulerPolicy();
_CRTIMP SchedulerPolicy(size_t _PolicyKeyCount, ...);
_CRTIMP SchedulerPolicy(const SchedulerPolicy& _SrcPolicy);

[ 코드1. SchedulerPolicy 의 생성자 ]

 기본 생성자와 복사 생성자를 제공하고, 정책을 설정할 수 있는 생성자를 제공하고 있습니다.

 기본 생성자로 생성할 경우, 기본 설정으로 정책을 생성합니다.

 정책을 설정할 수 있는 생성자는 첫 번째 인수로 설정할 정책의 수를 입력하고, 그 뒤에 입력한 정책의 수 만큼 정책의 키( key ) 와 값( value ) 을 입력해야 합니다.

SchedulerPolicy policy(3,       
   MinConcurrency, 2,
   MaxConcurrency, 4,
   ContextPriority, THREAD_PRIORITY_HIGHEST
);

[ 코드2. SchedulerPolicy 객체의 생성 ]

 위와 같이 생성한 SchedulerPolicy 객체는 3개의 정책을 설정하는데, 사용할 최소 동시성 자원( computing core ) 의 개수는 2개, 사용할 최대 동시성 자원의 개수는 4개, 현재 컨텍스트의 스레드 우선순위를 최고 순위로 설정하게 됩니다.

 위 코드에서 보여드린 정책들 뿐만 아니라 여러 가지 정책들을 설정할 수 있습니다.

 

설정할 수 있는 정책들

 PolicyElementKey 열거형으로 설정할 수 있는 정책들을 정의하고 있습니다. 그 내용은 다음과 같습니다.

 정책 키  설명  기본 값
 SchedulerKind  작업들을 수행할 때, 일반 스레드를 사용할지, UMS 스레드를 사용할지 설정  ThreadScheduler
 MaxConcurrency  스케줄러에서 사용할 최대 동시성 자원 수  MaxExecutionResources
 MinConcurrency  스케줄러에서 사용할 최소 동시성 자원 수  1
 TargetOversubscriptionFactor  작업을 수행 시, 자원에 할당할 스레드의 수  1
 LocalContextCacheSize  캐시할 수 있는 컨텍스트 수  8
 ContextStackSize  각 컨텍스트에서 사용할 스택 크기( KB )  0( 기본 스택 크기 사용 )
 ContextPriority  각 컨텍스트의 스레드 우선 순위  THREAD_PRIORITY_NORMAL
 SchedulingProtocal  스케줄 그룹의 작업 예약 알고리즘  EnhanceScheduleGroupLocality
 DynamicProgressFeedback  자원 통계 정책, 사용 금지.  ProgressFeedbackEnabled

[ 표1. PolicyElementKey 열거형에 정의된 설정할 수 있는 정책들 ]

기본 정책 설정

 Asynchronous Agents Library( 이하, AAL ) 에는 우리가 생성한 정책을 지정한 Scheduler 객체를 사용할 수 있지만, Parallel Patterns Library ( 이하, PPL ) 에는 우리가 생성한 Scheduler 객체를 사용할 수 없고, 내부적으로 생성되는 기본 스케줄러를 사용하게 됩니다. 기본 스케줄러는 기본 정책을 사용하게 되는데, 이 기본 정책을 미리 설정해 둘 수 있습니다.

_CRTIMP static void __cdecl SetDefaultSchedulerPolicy(
   const SchedulerPolicy& _Policy
);

[ 코드3. SetDefaultSchedulerPolicy() 의 선언 ]

 SetDefaultSchedulerPolicy() 의 인자로 SchedulerPolicy 를 생성하여 설정하면, 직접 Scheduler 를 생성하여 사용하지 않더라도, 내부에서 생성되는 기본 스케줄러에도 정책을 설정할 수 있습니다.

 

스케줄러 정책 획득

 이미 생성된 Scheduler 객체로부터 설정된 정책을 가져올 수 있습니다.

_CRTIMP static SchedulerPolicy __cdecl GetPolicy();	// CurrentScheduler::GetPolicy() static function
virtual SchedulerPolicy GetPolicy(); 			// Scheduler::GetPolicy() member function

[ 코드4. GetPolicy() 들의 선언 ]

 Scheduler 객체로부터 얻어온 정책은 설정할 때의 정책과 다를 수 있습니다. 정책을 설정하더라도, 해당 시스템에서 설정 가능한 정책만 적용되고, 그렇지 않은 경우에는 기본 정책 값이나 Resource Manager 에 의해 적당한 값으로 적용됩니다.

예를 들어 UMS 를 사용할 수 없는 시스템에서 UMS 를 사용하라고 설정하더라도, ThreadScheduler로 설정됩니다.

 

예제

 Scheduler 를 사용하는 방법과 SchedulerPolicy 인해 어떤 영향을 받는지 알아보는 예제를 보도록 하겠습니다.

 

시나리오

 문자열 순열을 구하는 프로그램입니다. 문자열 순열이란 문자열을 이루는 알파벳들로 만들 수 있는 모든 경우의 수를 말합니다.

문자열 순열을 구하는 작업은 오랜 시간이 걸리므로 agent 를 이용해 비 동기 처리를 합니다. 그리고  문자열 순열을 구하는 작업을 하는 agent 와 통신을 하며 진행 상황을 출력해주는 agent 가 비 동기로 처리됩니다.

 이 때, Concurrency Runtime 은 협조적 스케줄링을 하기 때문에 바쁜 스케줄러에 더 많은 자원을 할당합니다. 반대로 바쁘지 않은 스케줄러에는 적은 자원이 할당됩니다. 이로 인해 진행 상황을 출력하는 agent 가 제대로 스케줄링되지 않는 현상이 발생하게 됩니다.

 이의 해결책으로 진행 상황을 출력하는 agent 의 스레드 우선 순위를 높게 설정합니다.

 

코드

#include <Windows.h>
#include <ppl.h>
#include <agents.h>
#include <iostream>
#include <sstream>

using namespace std;
using namespace Concurrency;

// 문자열 순열을 구하는 agent
class permutor
	: public agent
{
public:
	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit permutor( ISource< wstring >& source, ITarget< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }	

protected:
	void run()
	{
		wstring s = receive( this->source );

		this->permute( s );

		this->done();
	}

	unsigned int factorial( unsigned int n )
	{
		if( 0 == n )
			return 0;

		if( 1 == n )
			return 1;

		return n * this->factorial( n - 1 );
	}

	wstring permutation( int n, const wstring& s )
	{
		wstring t( s );

		size_t len = t.length();

		for( unsigned int i = 2; i <  len; ++i )
		{
			swap( t[ n % i ], t[i] );
			n = n / i;
		}

		return t;
	}

	void permute( const wstring& s )
	{
		unsigned int permutation_count = this->factorial( s.length() );

		long count = 0;

		unsigned int previous_percent = 0u;

		send( this->progress, previous_percent );

		parallel_for( 0u, permutation_count, [&]( unsigned int i )
		{
			this->permutation( i, s );

			unsigned int percent = 100 * InterlockedIncrement( &count ) / permutation_count;
			if( percent > previous_percent )
			{
				send( this->progress, percent );
				previous_percent = percent;
			}
		} );

		send( this->progress, 100u );
	}

private:
	ISource< wstring >&			source;
	ITarget< unsigned int >&	progress;
};

// 진행 상황을 출력하는 agent
class printer
	: public agent
{
public:
	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress )
		: source( source )
		, progress( progress ) { }

	explicit printer( ISource< wstring >& source, ISource< unsigned int >& progress, Scheduler& scheduler )
		: agent( scheduler )
		, source( source )
		, progress( progress ) { }

protected:
	void run()
	{
		wstringstream ss;
		ss << L"Computing all permutations of '" << receive( this->source ) << L"'..." << endl;
		wcout << ss.str();

		unsigned int previous_percent = 0u;

		while( true )
		{
			unsigned int percent = receive( this->progress );

			if( percent > previous_percent || percent == 0u )
			{
				wstringstream ss;
				ss << L'\r' << percent << L"% complete...";
				wcout << ss.str();
				previous_percent = percent;
			}

			if( 100 == percent )
				break;
		}

		wcout << endl;

		this->done();
	}

private:
	ISource< wstring >&			source;
	ISource< unsigned int >&	progress;
};

// agent 의 작업을 관리하는 함수
void permute_string( const wstring& source, Scheduler& permutor_scheduler, Scheduler& printer_scheduler )
{
	single_assignment< wstring > source_string;
	unbounded_buffer< unsigned int > progress;

	permutor agent1( source_string, progress, permutor_scheduler );
	printer agent2( source_string, progress, printer_scheduler );

	agent1.start();
	agent2.start();

	send( source_string, source );

	agent::wait( &agent1 );
	agent::wait( &agent2 );
}

int main()
{
	const wstring source( L"Grapefruit" );

	// 기본 정책으로 작업을 수행
	Scheduler* pDefault_scheduler = CurrentScheduler::Get();

	wcout << L"With default scheduler: " << endl;
	permute_string( source, *pDefault_scheduler, *pDefault_scheduler );
	wcout << endl;

	// 진행 상황을 출력하는 agent 에 필요한 스레드 우선 순위를 높게 하는 정책을 설정하여 적용
	SchedulerPolicy printer_policy( 1, ContextPriority, THREAD_PRIORITY_HIGHEST );
	Scheduler* pPrinter_scheduler = Scheduler::Create( printer_policy );

	HANDLE hShutdownEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
	pPrinter_scheduler->RegisterShutdownEvent( hShutdownEvent );

	wcout << L"With higher context priority: " << endl;
	permute_string( source, *pDefault_scheduler, *pPrinter_scheduler );
	wcout << endl;

	pPrinter_scheduler->Release();

	WaitForSingleObject( hShutdownEvent, INFINITE );
	CloseHandle( hShutdownEvent );
}

[ 코드5. Scheduler 객체에 스레드 우선 순위를 높인 SchedulerPolicy 객체를 적용한 예제 ]

 기본 정책으로 수행했을 때에는 작업 진행 상황이 제대로 출력되지 않는 반면에, 스레드 우선 순위를 높게 설정한 경우에는 제대로 출력되는 것을 보실 수 있습니다.

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

[ 그림1. SchedulerPolicy 로 스레드 우선 순위를 변경한 예제 실행 결과 ]

 

마치는 글

 이번 글에서는 Scheduler 의 기본적인 기능 중 하나인 SchedulerPolicy 를 설정하는 방법을 알아보았습니다. SchedulerPolicy 를 이용하여 기본 정책으로 해결되지 않는 다양한 문제점들을 해결 하실 수 있을 것입니다.

Concurrency Runtime – Task Scheduler 1. ( Scheduler )

VC++ 10 Concurrency Runtime 2010. 9. 2. 08:30 Posted by 알 수 없는 사용자

Concurrency Runtime
– Task Scheduler 1. ( Scheduler )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이번 글은 Parallel Patterns Library( 이하 PPL ) 과 Asynchronous Agents Library( 이하 AAL ) 내부에서 스케줄링을 하는 Scheduler 에 대해서 알아보도록 하겠습니다.

 

Scheduler class

 Scheduler 클래스는 Concurrency Runtime 에서 실제로 스케줄링을 하는 객체입니다. 우리는 Scheduler 객체를 사용해서 스케줄링의 방법을 설정할 수 있습니다.

 Scheduler 는 내부적으로 작업들을 그룹화한 ScheduleGroup 을 관리합니다. 또한 요청된 작업을 수행하는 Context 객체에 접근할 수 있도록 하여, 좀 더 구체적인 스케줄링을 할 수 있도록 도와줍니다.

 

Scheduler 생성

 우리가 직접 Scheduler 를 생성하지 않아도, Concurrency Runtime 내부에서 기본 Scheduler 가 생성되어 스케줄링을 하게 됩니다. 이 경우에는 스케줄링 정책을 바꿀 수는 있으나, 세밀하게 제어할 수 없습니다.

 기본 Scheduler 외에 직접 우리가 Scheduler 를 생성하는 방법은 2 가지가 있습니다.

  • CurrentScheduler::Create() 는 현재 컨텍스트와 연결하는 Scheduler 를 만듭니다.
  • Scheduler::Create() 는 현재 컨텍스트와 연결되지 않는 Scheduler 를 만듭니다.

 Scheduler 는 내부적으로 참조 개수( reference counting ) 을 사용하여, 수명을 관리합니다. 그래서 참조 개수가 0이 되면 Scheduler 가 소멸됩니다.

 

Scheduler::Create()

 현재 컨텍스트와 연결되지 않은 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.

 

Scheduler::Attach()

 현재 컨텍스트와 Scheduler 를 연결합니다. 참조 개수가 증가합니다.

 

Scheduler::Reference()

 참조 개수가 증가합니다.

 

Scheduler::Release()

 참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.

 

CurrentScheduler::Create()

 현재 컨텍스트와 연결된 Scheduler 를 생성합니다. 참조 개수가 1로 설정됩니다.

 

CurrentScheduler::Detach()

 현재 컨텍스트를 분리합니다. 참조 개수가 감소합니다. 참조 개수가 0이 되면 소멸됩니다.

 

생성과 소멸, 연결과 분리

 위와 같은 함수들을 제공하지만, 생성과 소멸, 연결과 분리가 짝을 이루어야 합니다.

 CurrentScheduler::Create() 로 생성하였다면, CurrentScheduler::Detach() 로 소멸시키는 것이 좋습니다.

 Scheduler::Create() 로 생성하고, Scheduler::Attach() 로 연결하였다면, Scheduler::Detach() 로 해제하고, Scheduler::Release() 로 소멸해야 합니다.

 만약 Scheduler::Reference() 를 통해 참조 개수를 증가시켰다면, Scheduler::Release() 를 사용하여 참조 개수를 감소시켜주어야 합니다.

 

소멸 시점 알림

 모든 작업이 끝나기 전에는 Scheduler 를 소멸시키지 않습니다. 언제 Scheduler 가 소멸되는지 알기 위해서는 RegisterShutdownEvent() 를 사용하여 Windows API 의 EVENT 객체를 지정해 주고, WaitForSingleObject() 를 사용하여 소멸될 때까지 대기할 수 있습니다.

 

그 외의 멤버 함수

 위에서 설명한 멤버 함수 이외에 제공하는 멤버 함수들을 알아보도록 하겠습니다.

 

CurrentScheduler

  • Get() – 현재 컨텍스트에 연결된 Scheduler 의 포인터를 반환합니다. 참조 개수가 증가하지 않습니다.
  • CreateScheduleGroup() -  ScheduleGroup 을 생성합니다.
  • ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
  • GetPolicy() – 연결된 정책의 복사본을 반환합니다.

 

Scheduler

  • CreateScheduleGroup() – ScheduleGroup 을 생성합니다.
  • ScheduleTask() – Scheduler 의 일정 큐에 간단한 작업을 추가합니다.
  • GetPolicy() – 연결된 정책의 복사본을 반환합니다.
  • SetDefaultSchedulePolicy() – 기본 Scheduler 에 적용될 정책을 설정합니다.
  • ResetDefaultSchedulePolicy() – 기본 Scheduler 의 정책을 SetDefaultSchedulerPolicy() 를 사용하기 전의 정책으로 설정합니다.

 

마치는 글

 이번 글에서는 Concurrency Runtime 의 Scheduler 에 대해서 알아보았습니다. 위의 설명만으로는 어떻게 사용해야 하는지, 어떤 기능을 하는지 알기 어렵습니다.

다음 글에서 위에서 소개해드린 멤버 함수들의 사용 방법과 활용 예제들에 대해서 알아보도록 하겠습니다.

- 뜬금없이 뭐여..?

지금까지는 닷넷 4.0에 추가된 TPL과 PLINQ를 통해서 멀티 스레드 프로그래밍을 하는 방법을 살펴봤습니다. 그러면, 잠깐 추억을 되살릴겸, 뭐가 어떻게 달라졌는지도 한번 비교해 볼겸 해서, 닷넷 3.5까지의 멀티 스레드 프로그래밍 방법을 잠깐 살펴보도록 하겠습니다. 호호호호


- Thread와 다이다이로 작업하던 시절.

TPL은 System.Threading.Tasks를 사용해서, ThreadPool을 내부적으로 사용한다고 말씀을 드렸었습니다. 하지만, 그것 닷넷 4.0이나, 닷넷 3.5에서는 Reactive Extension(Rx)을 통해서 추가적으로 지원하는 기능이구요. 그 이전에는 직접적으로 Thread나 ThreadPool을 이용해서 프로그래밍 해야 했습니다. 그럼 Thread를 직접 사용하던 코드를 예제로 한번 보시죠.

using System;
using System.Threading;

namespace Exam18
{
    class Program
    {
        static readonly int max = 10000;

        public static void PrintAsync()
        {
            for (int count = 0; count < max; count++)
            {
                Console.Write("|");
            }
            Console.WriteLine("추가 스레드 끝");
        }

        static void Main(string[] args)
        {
            ThreadStart threadStart = PrintAsync;
            Thread thread = new Thread(threadStart);

            //추가 스레드 시작
            thread.Start();

            //현재 작업중인 스레드에서도 반복문 시작
            for (int count = 0; count < max; count++)
            {
                Console.Write("-");
            }
            Console.WriteLine("메인 스레드 끝");

            //혹시 현재 스레드가 빨리 끝나더라도,
            //추가 스레드가 끝날 때 까지 기다리기.           
            thread.Join();
        }
    }
}

<코드1> Thread와 다이다이로.

<코드1>을 보면, 맨 처음에 Task를 소개해드리면서 사용했던 예제를 Thread를 사용하도록 바꾼 코드입니다. 차이점이 있다면, ThreadStart타입의 델리게이트를 사용해야 한다는 것과, Wait()메서드가 아니라 Join()메서드를 사용한다는 것이죠. 결과를 보시면, Task를 사용했던 것과 동일합니다.

---------|||||||-|||||||||||--------------|||||||||-------------|||||------|||||||||||---------
-||||||||--------|||||||||||||-----메인 스레드 끝||||||||||||||||||||||||||||||||||||||||||||||
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|||||||||||||||||||||||||||||||||||||||||||||||||||추가 스레드 끝
계속하려면 아무 키나 누르십시오 . . .
<결과1> Thread를 사용한 결과.

그리고 Thread를 보면, Task와 마찬가지로 실행을 제어할 수 있도록 몇가지 속성을 제공하는데요, 그 목록을 보면 아래와 같습니다.

 속성  설명 
 Join()  추가 스레드가 완료되기 전에 메인 스레드가 완료되면, 추가 스레드가 하던 작업은 다 날아간다. 그래서 추가 스레드의 작업이 완료될 때까지 메인 스레드가 기다리도록 한다.
 IsBackground  이 속성은 기본적으로 false이다. 즉, 스레드는 기본적으로 foreground작업인데, 그 때문에 스레드가 완료되기 전까지는 프로세스를 종료시킬 수 없다. 이 속성을 true로 주면, 스레드의 작업이 완료되기 전에도 프로세스를 종료시킬 수 있다.
 Priority  Join메서드를 사용한 경우에, 이 속성을 통해서 스레드의 우선순위를 바꿀 수 있다.
 ThreadState  이 속성을 통해서 스레드의 상태를 파악할 수 있는데, Aborted, AbortRequested, Background, Runnging, Stopped, StopRequested, Suspended, SuspendRequested, Unstarted, WaitSleepJoin등의 상태 값을 얻을 수 있다.
 Thread.Sleep()  현재 실행 중인 스레드의 실행을 명시한 시간만큼 일시정시 시키는 메서드이다.
 Abort()  이름 그대로, 스레드를 중지시키는 메서드. ThreadAbortException이 발생된다.
<표1> Thread의 속성.

위의 Thread멤버 중에서, Task에도 있는 건, Join()과 ThreadState뿐입니다. 왜 그럴까요? 일반적으로 권장되지 않는 것들이기 때문이죠. 그래서 닷넷 프레임워크 4.0으로 프로그래밍 할 때는, 위에서 언급한 것들 중에서 Task에 없는 속성들을 될 수 있으면 사용하지 말아야 합니다.


- ThreadPool을 사용해보자.

ThreadPool을 사용하면, 새로운 스레드를 계속 해서 생성하기 보다 기존에 있는 스레드를 재활용해서 추가적인 스레드 생성을 막을 수 있습니다. 참고로, TPL이 내부적으로 ThreadPool을 사용한다고 말씀드렸었죠? 그럼 ThreadPool을 사용하는 예제도 한번 보시죠.

using System;
using System.Threading;

namespace Exam19
{
    class Program
    {
        static readonly int max = 10000;

        public static void PrintAsync(object state)
        {
            for (int count = 0; count < max; count++)
            {
                Console.Write(state.ToString());
            }
            Console.WriteLine("추가 스레드 끝");
        }

        static void Main(string[] args)
        {
            ThreadPool.QueueUserWorkItem(PrintAsync, "|");
           
            //현재 작업중인 스레드에서도 반복문 시작
            for (int count = 0; count < max; count++)
            {
                Console.Write("-");
            }
            Console.WriteLine("메인 스레드 끝");

            //혹시 현재 스레드가 빨리 끝나더라도,
            //추가 스레드가 끝날 때 까지 기다리기.           
            Thread.Sleep(1000);
        }
    }
}

<코드2> ThreadPool을 사용한 코드.

<코드2>를 보시면, ThreadPool을 사용하고 있는데요. QueueUserWorkItem메서드를 통해서 작업을 추가하고 있습니다. 그러면, 자동으로 스레드를 활용해서 작업을 시작하게 되구요. 결과는 앞선 예제와 동일합니다. 그런데, ThreadPool을 사용할 때 장점만이 있는 건 아닌데요. 작성한 코드외에도 다른 라이브러리등에서 내부적으로 시간이 많이 걸리는 I/O작업 등에 ThreadPool을 사용한다면, 그 작업이 끝날 때까지 기다려야 하거나, 심한 경우에는 데드락이 발생하기도 합니다. 그리고 Thread나 Task를 사용할 때와는 다르게 ThreadPool은 실행 중인 작업에 접근할 수 있는 방법이 없습니다. 그래서 실행 중인 작업을 조종한다거나, 상태를 확인할 수 가 없죠. 그래서 <코드2>를 보시면, Join()이나 Wait()대신에, Thread.Sleep()메서드를 통해서 추가 스레드가 끝날 때까지 메인 스레드를 기다리게 합니다.


- 마치면서

오늘은 닷넷 3.5 까지의 멀티 스레드 프로그래밍 방법에 대해서 알아봤는데요. 크게 다른 모습은 없습니다. 다만, 좀 더 안전하고 간단한 방법을 제공하는 것이죠. 대한민국도 16강에 진출했는데 오늘은 여기까지 하시죠!...응??


- 참고자료

1. Essential C# 4.0, Mark Michaelis, Addison Wesley

- 인생이 원하는 대로 가지는 않더라.

우리는 인생을 살면서, 여러가지 계획을 세웁니다. 하지만, 보통 계획을 세울 때, 예외적인 상황을 감안하지 않는 경우가 많습니다. 그래서 늘 일정은 실패로 끝나게 되고, '아, 난 안되나 보다.'하고 절망하게 되는 것이죠. 자세한 내용은 최인철 교수님의 '프레임'을 참고하시면..... 순간 책 소개 코너로 빠져들뻔 했군요. 어헣.

아무튼, 우리는 인생에서 뿐만 아니라 프로그래밍에서도 생각외의 순간을 많이 만나게 됩니다. '도대체 어떤 **가 이런 거 까지 해볼까?'하는 생각으로 안이하게 프로그래밍을 하다보면 기상 천외한 버그 리포트를 받게 됩니다. 그래서 예외 처리가 중요한 것이죠. 오늘은 병렬 프로그래밍에서 예외 처리 하는 법에 대해서 간단하게 이야기를 해보려고 합니다.


- 차이점 하나.

기존의 프로그래밍에서는 그저 예외를 발생시 처리하고 싶은 구문을 try로 감싸면 되었는데요. 병렬 프로그래밍에서는 어떨까요? Task.Start()를 try로 감싸면 결과를 얻을 수 있을까요? 한번 실험해보죠.

using System;
using System.Threading.Tasks;

namespace Exam5
{
    class Program
    {
        static string Calc(object from)
        {
            long sum = 0;
            long start = (long)from;
            Console.WriteLine("현재 이 메서드를 실행중인 스레드 ID : {0}",
                Task.CurrentId);

            for (long i = start; i < 100000000; i++)
            {
                sum += i;
                if (i == 100000)
                {
                    //100000에 이르면, 그냥 예외를 던진다ㅋ.
                    throw new ApplicationException("그냥 에러가 났음");
                }
            }
            Console.WriteLine("계산 끝");
            return sum.ToString();
        }

        static void Main(string[] args)
        {
            Task<string> task = new Task<string>(
                Calc, 1L);

            try
            {
                task.Start();
            }
            catch (AggregateException ex)
            {
                foreach (var item in ex.InnerExceptions)
                {
                    Console.WriteLine("에러 : {0}", item.Message);
                }
            }

            Console.WriteLine(task.Result);
        }
    }
}

<코드1> Start를 try로 감싸기

<코드1>을 보시면, task.Start()를 try로 감싸고 있습니다. 과연 실행중에 던지는 예외를 잘 받을 수 있을까요? 결과는 아래와 같습니다.

현재 이 메서드를 실행중인 스레드 ID : 1

처리되지 않은 예외: System.AggregateException: 하나 이상의 오류가 발생했습니다.
---> System.ApplicationException: 그냥 에러가 났음
   위치: Exam5.Program.Calc(Object from) 파일 C:\Users\boram\Documents\Visual St
udio 10\Projects\Chapter9\Exam7\Program.cs:줄 21
   위치: System.Threading.Tasks.Task`1.InvokeFuture(Object futureAsObj)
   위치: System.Threading.Tasks.Task.InnerInvoke()
   위치: System.Threading.Tasks.Task.Execute()
   --- 내부 예외 스택 추적의 끝 ---
   위치: System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCance
ledExceptions)
   위치: System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, Cancellatio
nToken cancellationToken)
   위치: System.Threading.Tasks.Task`1.get_Result()
   위치: Exam5.Program.Main(String[] args) 파일 C:\Users\boram\Documents\Visual
Studio 10\Projects\Chapter9\Exam7\Program.cs:줄 45
계속하려면 아무 키나 누르십시오 . . .

<결과1> <코드1>의 실행 결과

<결과1>을 보면, 예외는 전혀 잡히지 않았습니다. 왜 일까요? 작업에서 실행하는 코드(여기서는 Calc메서드)는 Start메서드 내에서 실행되는 게 아니라, Start메서드로 작업이 시작된 이후에야 별도로 시작되기 때문이죠. 그래서 Start메서드에 try를 걸어봤자 예외는 잡을 수 없습니다.

가만히 생각해보면, 작업 안에서 처리되는 예외(즉, Calc메서드 내에서 발생하고 처리되는 예외)는 전혀 고민할 필요가 없겠죠. 하지만, 처리되지 못한 예외가 발생하는 경우는 바깥에서 처리할 방법을 찾아야 합니다.

CLR 2.0 버전까지는 이런 처리되지 못한 예외가 발생하면, 예외가 버블링되면서, 상위 계층으로 전파되면서 결국에는 윈도우 에러 보고 대화상자를 열게 만들고, 프로그램은 종료되었습니다. 하지만, 작업 내부에서 처리되지 못한 예외라고 하더라도, 작업 바깥에서 처리할 수 있는 방법이 있다면, 프로그램이 종료되는 것 보다는 바깥에서 처리할 수 있게 하는 게 더 나은 방법이겠죠.

작업내부에서 처리 안된 예외(unhandled exception)가 발생했다면, 그 예외는 일단 작업을 마무리 짓는 멤버(Wait(), Result, Task.WaitAll(), Task.WaitAny())가 호출되기 전까지는 조용히 기다립니다. 그리고 마무리 멤버의 호출에서 처리 안된 예외가 발생하게 되는 것이죠. <코드1>에서 Start메서드를 try블록으로 감쌌지만, 예외를 잡을 수 없었던 이유가 바로 여기에 있습니다. 처리 안된 예외는 마무리 멤버와 함께 발생하기 때문이죠. 그러면, <코드1>을 수정해서, 예외를 제대로 잡도록 수정해보겠습니다.

task.Start();

try
{
    Console.WriteLine(task.Result);
}

catch (AggregateException ex)
{
    foreach (var item in ex.InnerExceptions)
    {
        Console.WriteLine("에러 : {0}", item.Message);
    }
}

<코드2> Result를 try로 감싸라.

<코드2>를 보면, 작업을 마무리 짓는 멤버 중의 하나인, Result를 try블록으로 감싸고 있습니다. 그리고 결과를 보면,

현재 이 메서드를 실행중인 스레드 ID : 1
에러 : 그냥 에러가 났음
계속하려면 아무 키나 누르십시오 . . .
<결과2> 제대로 처리된 예외.

예외가 제대로 처리 된 것을 확인할 수 있습니다.


- 차이점 둘.

혹시 <코드1>, <코드2>를 주의 깊게 보신 분이라면, 처음보는 예외 하나를 발견하셨을지도 모르겠습니다. 바로 AggregateException인데요, 이에 대해서 이야기를 좀 해보겠습니다.

AggregateException은 닷넷 프레임워크 4.0에서 처음 추가된 예외 타입인데요, MSDN의 설명을 보면(http://msdn.microsoft.com/en-us/library/system.aggregateexception.aspx), '프로그램 실행 중에 발생하는 하나 또는 여러개의 에러를 표현하는 수단'이며, 주로 TPL과 PLINQ에서 활용되고 있다고 합니다.

aggregate는 여러 개의 작은 것들을 서로 합치는 이미지를 갖고 있는데요, AggregateException은 그렇다면 여러 개의 예외를 하나로 합치는 예외타입이라는 말이 됩니다. 그런데, 여러 개의 예외는 어디서 오는 걸까요? 예전에 작성했던 예제를 조금 수정해서 확인 해보도록 하죠.

using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Threading.Tasks;

namespace Exam7
{
    class Program
    {
        static void Main(string[] args)
        {
            IEnumerable<string> files =
                Directory.GetFiles("C:\\음악", "*", SearchOption.AllDirectories);
            List<string> fileList = new List<string>();

            Console.WriteLine("파일 개수 : {0}", files.Count());
           
            Parallel.ForEach(files, (file) =>
            {
                FileInfo fileInfo = new FileInfo(file);
                if (fileInfo.Exists)
                {
                    if (fileInfo.Length >= 15000000)
                    {
                        throw new ApplicationException("15메가 넘는 파일이!!");
                    }
                    else if (fileInfo.Length >= 1000000)
                    {
                        fileList.Add(fileInfo.Name);
                    }
                    Console.Write("{0}", Task.CurrentId.ToString());                   
                }
            });
        }
    }
}

<코드3> 약간 수정된 예제.

<코드3>이 바로 그 예제인데요, 예제를 보면, 음악 폴더에서 15메가 넘는 파일이 발견되면, 예외를 발생하도록 되어있습니다. flac같은 파일로 보자면, 15메가 넘는 파일은 흔히 있겠지만, 저는 서민이라 mp3를 선호합니다. 어헣-_-. 아무튼, 이 예제를 실행 시켜보면요, 간혹 15메가 넘는 파일이 몇개는 있기 마련이기 때문에, 실행 중에 에러가 나게 되어 있습니다. 한번 디버깅을 해보죠.


<그림1> 병렬 스택 창

<그림1>의 병렬 스택을 보시면요, 병렬 ForEach문에 의해서 4개의 작업자 스레드가 생성된 걸 확인할 수 있습니다.(리스트의 크기나, CPU자원 상태등에 따라서 개수는 계속해서 변합니다.)

<그림2> 병렬 작업 창

그리고 <그림2>의 병렬 작업창을 보면, 2번 스레드가 15메가 넘는 파일을 만난 것을 확인할 수 있습니다. 그러면, 2번 스레드가 예외를 던질텐데, 나머지 작업은 어떻게 될까요? 처리 되지 않은 예외가 발생하는 순간, 다른 작업들은 모두 날아가버립니다.

그런데, 각 스레드 별로 파일 리스트를 쪼개서 줬을 텐데요. 각 스레드가 각자의 목록을 가지고 작업을 하다보면, 각 스레드 별로 15메가가 넘는 파일을 발견하게 될 것입니다. 이런 예외들을 만날 때 마다 처리하지 말고, 모두 모아서 한번에 처리하려면 어떻게 해야 할까요? 그래서 바로 AggregateException을 사용하는 거죠.

using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace Exam8
{
    class Program
    {
        static void Main(string[] args)
        {
            IEnumerable<string> files =
                Directory.GetFiles("C:\\음악", "*", SearchOption.AllDirectories);
            List<string> fileList = new List<string>();

            Console.WriteLine("파일 개수 : {0}", files.Count());

            var exceptions = new ConcurrentQueue<Exception>();

            try
            {
                Parallel.ForEach(files, (file) =>
                {
                    FileInfo fileInfo = new FileInfo(file);
                    if (fileInfo.Exists)
                    {
                        try
                        {
                            if (fileInfo.Length >= 15000000)
                            {
                                throw new ApplicationException("15메가 넘는 파일이!!");
                            }
                            else if (fileInfo.Length >= 1000000)
                            {
                                fileList.Add(fileInfo.Name);
                            }
                            Console.Write("{0}", Task.CurrentId.ToString());
                        }
                        catch (Exception ex)
                        {
                            exceptions.Enqueue(ex);
                        }
                    }
                });

                throw new AggregateException(exceptions);
            }
            catch (AggregateException ex)
            {
                foreach (var item in ex.InnerExceptions)
                {
                    Console.WriteLine("\n에러 : {0}", item.Message);
                }

                Console.Write("\n파일 리스트 계속해서 보기(엔터키를 치세요)");
                Console.ReadLine();
            }
            finally
            {
                foreach (string file in fileList)
                {
                    Console.WriteLine(file);
                }

                Console.WriteLine("총 파일 개수 : {0}", fileList.Count());
            }
        }
    }
}

<코드4> AggregateException을 사용.

ConcurrentQueue는 Queue긴 하지만, 여러 스레드에 의해서 동시에 큐에 추가를 하거나 해도 안전하도록 만들어진(thread-safe) Queue입니다. 그 큐에서 예외가 발생할 때마다, 예외를 저장해뒀다가, 한꺼번에 AggregateException으로 던지는 것이죠. 그리고 바깥쪽의 catch블록에서 예외를 받아서 내부의 예외 목록을 하나씩 처리하는 것입니다.

(생략)
에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

에러 : 15메가 넘는 파일이!!

파일 리스트 계속해서 보기(엔터키를 치세요)

<결과 3> 처리과정에서 생긴 예외를 모두 모아서 처리.


- 마무리.

오늘 예외처리에 대해서 봤습니다. 다음은~? 작업을 연쇄적으로 처리할 수 있도록 하는 부분을 보겠습니돠. 그럼 평안하시길. 어허허허허헣.


- 참고자료

1. Essential C# 4.0, Mark Michaelis, Addison Wesley
2. http://msdn.microsoft.com/en-us/library/dd460695.aspx
3. http://msdn.microsoft.com/en-us/magazine/ee321571.aspx
4. http://msdn.microsoft.com/en-us/library/system.aggregateexception.aspx

Welcome to Parallel C#(3) - 작업의 기본.

C# Parallel Programming 2010. 5. 31. 09:00 Posted by 알 수 없는 사용자
- 작업해본 적이나 있수?

물론이죠-_-;; 이 나이에 작업해 본 적도 없으면, 마법사 정도가 아니라, 신이 됐겠죠. 어헣. 오늘의 작업은 그 작업은 아니고... 스레드와 관련된 작업입니다. 부디 오해 없으시길 바라고, 작업의 기본은 다른 연예 서적에서 얻으시길.


- Task 시작하기.

지난 포스트에서 했던 예제를 한번 돌아보겠습니다.

using System;
using System.Threading.Tasks;

namespace Exam2
{
    class Program
    {
        static void Main(string[] args)
        {
            const int max = 10000;

            //현재 작업중인 스레드외에 추가로 스레드를 생성
            Task task = new Task(() =>
                {
                    for (int count = 0; count < max; count++)
                    {
                        Console.Write("|");
                    }
                });

            //추가 스레드 시작
            task.Start();

            //현재 작업중인 스레드에서도 반복문 시작
            for (int count = 0; count < max; count++)
            {
                Console.Write("-");
            }

            //혹시 현재 스레드가 빨리 끝나더라도,
            //추가 스레드가 끝날 때 까지 기다리기.           
            task.Wait();
        }
    }
}

<코드1>

Main메서드를 실행하는 스레드 외에 또 하나의 스레드를 추가로 생성해서, 두 개의 스레드로 화면에 다른 문자열을 출력하는 예제였죠. 이 예제를 보면, Task라는 클래스를 사용하고 있습니다. 이 클래스는 닷넷 프레임워크 4.0에 새롭게 추가된 클래스인데요. 기존의 멀티스레드 프로그래밍을 한 단계 높은 추상화를 통해서 프로그래머의 실수를 줄이고, 좀 더 직관적인 코드를 작성할 수 있게 해주는 TPL(Task Parallel Library)에 포함되어서 추가된 클래스입니다. 중심에 있는 클래스라고 볼 수 있죠.

Task클래스는 관리되지 않는 스레드를 한단계 감싸서 추상화를 시킨 클래스입니다. 내부적으로 스레드 풀을 사용하는 데요, 내부적으로는 System.Threading.ThreadPool을 사용해서 요청에 따라 스레드를 새로 생성하거나, 이미 생성된 스레드를 재활용해서 부하를 줄입니다.

새로운 Task가 실행할 동작은 델리게이트를 통해서 명시해주는데요, <코드1>에서 굵게 처리된 부분이 바로 그 부분입니다. 카운트에 따라서 문자열을 출력하는 델리게이트를 생성자에 넘겨주고 있는 거죠. 물론, 이렇게 델리게이트를 넘겨준다고 해서 바로 스레드가 실행되는 건 아닙니다. Start메서드를 통해서 실행을 해줘야만 스레드가 실행되는 것이죠.


- Task가 끝날 때?

그러면, <코드1>은 두개의 스레드가 실행이 되면서 서로 다른 문자열을 번갈아 가면서 출력하겠죠. 여기서 한가지 생각해볼게 있습니다. 콘솔 어플리케이션은 Main메서드의 실행으로 시작하고, Main메서드의 끝과 함께 종료됩니다. 그렇다면, Main메서드의 실행을 맡은 스레드가 종료되었는데, 추가로 생성한 스레드의 작업이 안끝났다면 어떤 일이 벌어질까요?

//현재 작업중인 스레드외에 추가로 스레드를 생성
Task task = new Task(() =>
    {
        for (int count = 0; count < max; count++)
        {
            Console.Write("|");
        }
        Console.WriteLine("추가 스레드 끝");
    });

//추가 스레드 시작
task.Start();

//현재 작업중인 스레드에서도 반복문 시작
for (int count = 0; count < max; count++)
{
    Console.Write("-");               
}
Console.WriteLine("메인 스레드 끝");

//혹시 현재 스레드가 빨리 끝나더라도,
//추가 스레드가 끝날 때 까지 기다리기.           
//task.Wait();

<코드2>

<코드1>을 <코드2>와 같이 수정한 다음에 실행해보죠. 그러면, 둘 중의 어떤 스레드가 빨리 끝날까요? 그건 그때 그때 다릅니다-_- 그래서 아래와 같은 두 경우가 생길 수 있죠.

||||-----||||||||--------------|||||||||||||-------------||||||||||||-----------||||||---------------||||||||||||------------|||||||||||-----------||||||||||||--|||||||||||||-------|||||||||||-------------||||||||||||--------------||||||||||----------------|||---------------||||||||||||||-----||-------------||||||||||||----------------|||||||||||||-------------||||||||||||-------------|||---|------||||||||||||||-----------------|---------------||||||||||||||---------------메인 스레드 끝
|계속하려면 아무 키나 누르십시오 . . .
<결과1> 메인스레드가 먼저 끝나는 경우

|----------------|||||||||||||-----------||||||||||||||---------||||||||||||-----||||||||||||||||---------------||||||||||||||--------------||-----------||||||||------------|||||||||||||||-------------|--------------|||||||-------------|||||---||추가 스레드 끝----------------------------------------------------------------------------------------------------------------------------------------------------------메인스레드 끝
계속하려면 아무 키나 누르십시오 . . .
<결과2> 추가 스레드가 먼저 끝나는 경우

<결과2>는 추가 스레드가 먼저 끝나면서 모든 결과가 출력이 되었지만, <결과1>은 Main메서드의 실행을 맡은 메인 스레드가 먼저 끝나면서 프로그램이 종료되었고, 따라서 추가스레드의 나머지 결과는 날아가 버렸습니다. Wait메서드는 메인 스레드가 먼저 끝나더라도, 추가 스레드가 끝날 때까지 기다리게 하는 역할을 합니다. 그래서 메인 스레드가 빨리 끝나더라도, 항상 추가 스레드의 결과까지 제대로 출력되게 되는 것이죠.

||||||||--------|||||||||||||||--------------|||||||||||||------------|||||||||||||--------||||
||-메인 스레드 끝||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||추가 스레드 끝
계속하려면 아무 키나 누르십시오 . . .
<결과3> Wait메서드를 사용한 경우


- 가는 길은 여러갈래.

앞에서 스레드의 시작은 Start메서드를 통한다고 말씀 드렸지만, 항상 그런 것은 아닙니다. 생성과 동시에 실행을 시킬 수도 있습니다.

Task task = Task.Factory.StartNew(() =>
                {
                    for (int count = 0; count < max; count++)
                    {
                        Console.Write("|");
                    }
                    Console.WriteLine("추가 스레드 끝");
                });
<코드3> 생성과 동시에 스레드 시작

<코드1>의 Task생성 부분을 <코드3>과 같이 수정하고, Start메서드 호출부분을 주석처리하면, 동일한 결과를 얻을 수 있습니다.

그리고 <코드1>에서는 메인스레드가 추가 스레드가 끝날 때까지 기다리기 위해서 Wait메서드를 사용했지만, 다른 방법도 있습니다. 만약에 추가 스레드에 입력된 델리게이트가 결과값을 반환하고, 메인 스레드에서 그 결과값을 사용해야 한다면, 메인 스레드는 추가 스레드의 작업이 끝나서 결과가 나올 때까지 기다립니다. Wait메서드 없이도 말이죠. 어찌보면 당연한 이야기죠 ㅋ

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Exam3
{
    class Program
    {
        static void Main(string[] args)
        {
            Task<string> task = Task.Factory.StartNew<string>(
                () =>
                {
                    long sum = 0;
                    for (long i = 0; i < 100000000; i++)
                    {
                        sum += i;
                    }
                    return sum.ToString();
                });

            foreach (char busySymbol in BusySymbols())
            {
                if (task.IsCompleted)
                {
                    Console.WriteLine('\b');
                    break;
                }
                Console.WriteLine(busySymbol);
            }

            Console.WriteLine();
            //여기서 추가 스레드가 끝날 때 까지 기다린다.
            Console.WriteLine(task.Result);
            System.Diagnostics.Trace.Assert(
                task.IsCompleted);
        }

        private static IEnumerable<char> BusySymbols()
        {
            string busySymbols = @"-\|/-\|/";
            int next = 0;
            while (true)
            {
                yield return busySymbols[next];
                next = (++next) % busySymbols.Length;
                yield return '\b';
            }
        }
    }
}

<코드 4> 결과 기다리기.

<코드4>는 추가 스레드의 결과를 계속 기다리다가, 결과가 나오는 순간, 출력하고 끝납니다.


- 멀티스레드 쉽고만?

이라고 생각하시면 곤란하구요-_-;; 열심히 공부 중인데, 역시 어렵습니다. 다만, Task클래스가 상당히 많은 부분을 간소화 시켜 주기 때문에, 한 층 더 편해진 느낌이랄까요? 오늘 여기까지!


- 참고자료

1. Essential C# 4.0, Mark Michaelis, Addison Wesley