Asynchronous Agents Library - agent. 1 ( 상태 )

VC++ 10 Concurrency Runtime 2010. 6. 5. 08:00 Posted by 알 수 없는 사용자

Asynchronous Agents Library – agent. 1 ( 상태 )

작성자: 임준환( mumbi at daum dot net )

 

Agent 란 무엇인가

개념

 Agent 는 사전적 의미로 동작이나 행위를 행하는 사람이나 사물이라는 뜻을 가지고 있습니다. 그 말 그대로 Asynchronous Agents Library( 이하 AAL ) 의 agent 는 어떤 행위를 하는 객체를 나타냅니다.

 지난 글에서 AAL 이 actor-based programming 이라고 언급한 적이 있습니다. 여기에서 actor 가 바로 agent 를 일컫습니다.

 이해를 돕기 위해 비유를 하자면 agent 는 하나의 작업이라고 생각해도 좋습니다. 그런데 이 작업은 단순히 한 번 처리되는 작업을 말하는 것이 아니라, 어떤 책임이나 역할을 하는 작업입니다. 우리는 멀티 스레드 프로그래밍을 할 때, 이런 개념을 worker 라고 말하기도 합니다.

 예를 들어, 네트워크 프로그래밍을 할 때, 소켓에 연결 요청을 기다리고 요청이 들어오면 연결해주는 역할을 하는 스레드가 필요합니다. 이 스레드는 어떤 책임이나 역할을 하는 작업을 가지고 있습니다. 이 스레드를 객체로 표현하면 agent 라고 할 수 있습니다.

클래스

 실제 개발 시 필요한 agent 클래스는 Concurrency 네임스페이스 안에 존재하고, agents.h 파일 안에 정의되어 있습니다.

 agent 클래스는 추상 클래스로 agent 로 만들 클래스가 상속해야 합니다. agent 클래스의 추상 메소드는 void run() 이고, 반드시 구현해야 합니다.

- 예

#include <iostream>
#include <agents.h>

using namespace std;
using namespace Concurrency;

class TestAgent
	: public agent
{
protected:
	void run()
	{
		wcout << L"running.." << endl;
		this->done();
	}
};

[ 코드1. agent 상속 예 ]

 

agent 의 상태

 agent 는 비 동기 처리를 목적으로 하기 때문에, 내부적으로 스레드를 사용할 수 밖에 없습니다. agent 의 상태란 agent 의 작업을 처리하는 스레드의 상태라고 볼 수 있습니다.

 agent 는 생명 주기를 갖는데, 이 주기는 상태로 표현됩니다.

[ 그림1. agent 샐명 주기 ]

[ 그림1. agent 생명 주기 ]


 처음 agent 상태는 크게 초기 상태, 활성화 상태, 종료 상태로 나눌 수 있습니다. agent 객체를 생성하면 그 agent 는 초기 상태인 created 상태가 됩니다. start() 를 호출하면 Concurrency Runtime 에 의해 해당 agent 가 스케줄 되고 runnable 상태가 됩니다.

 createdrunnable 상태에서 cancel() 을 호출하여 작업을 취소하면 canceled 상태가 되는데 이는 종료 상태 중 하나입니다.

 위의 그림에서 점선으로 표시된 run() 은 우리가 명시적으로 호출하는 것이 아니라 runnable 상태인 agent 를 Concurrency Runtime 이 호출하는 것을 의미합니다. run() 이 호출되면 활성화 상태인 started 상태가 됩니다.

 만약 모든 작업의 수행이 완료되었으면 done() 함수를 호출하여 종료 상태인 done 상태로 바꾸어야 합니다. 동기화를 위한 wait() 등의 함수는 해당 agent 가 종료 상태 즉, done 이나 canceled 상태가 되어야 반환됩니다.

 agent 생명 주기에서 가장 중요한 것은 순환되지 않는다는 것입니다. 즉, 되돌아 갈 수 없습니다. 한 번 종료 상태를 갖은 agent 객체는 이미 쓸모 없는 객체가 되고 재사용할 수 없습니다. 그러므로 해당 작업을 다시 수행하기 위해서는 새로운 agent 객체를 생성해야 합니다.

 다음은 각 상태에 대한 정의입니다.

 agent 상태  설명
 agent_created  agent 가 아직 스케줄 되지 않음.
 agent_runnable  Concurrency Runtime 이 agent 를 스케줄 하고 있음.
 agent_started  agent 가 실행 중임.
 agent_done  agent 의 작업이 완료됨.
 agent_canceled  agent 가 실행 되기 전에 취소됨.

[ 표1. agent status ]

 이 상태들은 status() 메소드로 알아 낼 수 있습니다. status() 는 동기화되기 때문에 정확한 상태를 반환하지만, 그 상태가 현재의 상태라고 보장할 수 없습니다. 왜냐하면 status() 가 반환된 직 후 agent 의 상태가 바뀔 수 있기 때문입니다.

- 예

코드

#include <iostream>
#include <agents.h>

using namespace std;
using namespace Concurrency;

class TestAgent
	: public agent
{
protected:
	void run()
	{
		wcout << L"running.." << endl;
		this->done();
	}
};

void print_agent_status( agent& a )
{
	wstring status;
	switch( a.status() )
	{
	case agent_status::agent_created:	status = L"agent_created";	break;
	case agent_status::agent_runnable:	status = L"agent_runnable";	break;
	case agent_status::agent_started:	status = L"agent_started";	break;
	case agent_status::agent_done:		status = L"agent_done";		break;
	case agent_status::agent_canceled:	status = L"agent_canceled";	break;
	}

	wcout << status.c_str() << endl;
}

int main()
{
	TestAgent testAgent;

	print_agent_status( testAgent );

	testAgent.start();

	for( int i = 0; i < 10; ++i )
	{
		print_agent_status( testAgent );
	}

	agent::wait( &testAgent );

	print_agent_status( testAgent );
}

[ 코드2. agent 생명 주기 ]

실행 결과 

[ 그림2. 코드2 실행 결과 ]

[ 그림2. 코드2 실행 결과 ]

 

마치는 글

 이번 글에서는 agent 의 개념과 클래스, 그리고 상태에 대해서 알아보았습니다. agent 를 사용하기 위해서는 조금 더 알아야 할 것들이 있습니다.

 조금 더 알아야 할 내용들은 다음 글에 작성해 보도록 하겠습니다.

 

참고

  • 그림1. agent 생명 주기 - http://i.msdn.microsoft.com/dynimg/IC338844.png

Asynchronous Agents Library 소개

VC++ 10 Concurrency Runtime 2010. 5. 29. 09:00 Posted by 알 수 없는 사용자

Asynchronous Agents Library 소개

작성자: 임준환( mumbi at daum dot net )

 

Concurrency Runtime 의 컴포넌트

[ 그림1. Concurrency Runtime Architecture ]

[ 그림1. Concurrency Runtime Architecture ]


 Asynchronous Agents Library 는 위 그림에서 보듯이 Concurrency Runtime 프레임워크 안에서 돌아가는 내부 컴포넌트 중 하나입니다.

 라이브러리라는 명칭에서 독립적으로 수행될 것 같은 오해를 가질 수 있으나, 사실은 Concurrency Runtime 의 Task Scheduler 와 Resource Manager 를 기반으로 만들어졌습니다.

 Concurrency Runtime 에서 Task Scheduler 와 Resource Manager 를 하위 레벨 컴포넌트라고 본다면, Asynchronouse Agent Library( 이하, AAL ) 와 Parallel Patterns Library( 이하, PPL ) 은 상위 레벨 컴포넌트라고 볼 수 있습니다. 다시 말해, AAL, PPL 모두 Task Scheduler 와 Resource Manager 를 사용한다는 말입니다.

 Concurrency Runtime 이 아닌 다른 프레임워크에서도 하위 레벨보다 상위 레벨 컴포넌트가 사용 용이성과 안정성이 높은 것처럼 Concurrency Runtime 에서도 마찬가지입니다.

 Concurrency Runtime 개발자들은 비 동기 작업들을 하기 위해서는 AAL, 단위 작업의 병렬 처리를 하기 위해서는 PPL 을 사용하기를 권하고, 좀 더 특별한 최적화나 하위 레벨 작업이 필요한 경우에만 직접 Task Scheduler 를 사용하기를 권하고 있습니다.

 즉, AAL 은 Concurrency Runtime 의 비 동기 작업을 위한 인터페이스라고 볼 수 있습니다.( PPL 은 병렬 처리를 위한 인터페이스라고 볼 수 있습니다. )

 

비 동기 작업

 비 동기 작업이란 작업이 끝날 때까지 기다리지 않는다는 말입니다. 즉, 어떤 함수가 있을 때, 그 함수는 바로 반환되어야 합니다. 바로 반환되면 호출한 스레드는 바로 다른 작업을 수행할 수 있습니다. 이 때, 해당 함수의 작업은 호출한 스레드가 아니라 다른 작업 스레드에서 수행되고, 작업이 완료되면 호출한 스레드에게 통지하거나, 콜백( call back ) 함수를 호출합니다.

 이러한 처리 방식을 비 동기 처리 방식이라고 합니다. AAL 은 이런 비 동기 작업을 용이하게 해주는 라이브러리입니다.

 

기능

 AAL 은 크게 두 가지 특징을 가지고 있습니다. 하나는 actor-based 프로그래밍 모델이고, 다른 하나는 message passing 프로그래밍 모델입니다.

Agent

 Agent 는 AAL 의 기능 중 하나인 actor-based 프로그래밍 모델의 actor 를 나타내는 개념입니다. 인공 지능 등과 같은 다른 분야에서 사용하는 개념과 마찬가지로 어떤 임무를 수행하는, 즉 어떤 역할을 하는 객체를 일컫습니다.

 예를 들어, 스마트 폰이 만들어 지려면 케이스( case ) 생산, 하드웨어 생산, 케이스와 하드웨어 조립, OS 및 소프트웨어 설치, 테스트와 같은 공정이 필요한데, 각 공정들을 수행하는 객체들을 agent 라고 볼 수 있습니다.

Message

 Message 는 agent 들 간의 통신을 위한 메커니즘입니다. AAL 에서의 message 란 상황이나 상태를 알리기 위한 수단이 될 수도 있고, 실제 데이터를 전송하기 위한 수단이 될 수도 있습니다.

 예를 들면, 스마트 폰 공정들 사이에 케이스가 생산되었다면 생산된 케이스를 조립하는 곳으로 전달해야 합니다. 이 때, 케이스를 전달하는 수단으로 message 가 사용될 수 있습니다. 또한 케이스와 하드웨어의 조립, OS 및 소프트웨어 설치가 완료되어야지 테스트를 진행할 수 있습니다. 이런 완료 상태와 같은 작업의 상태를 알리기 위해서 message 가 사용될 수 있습니다.

 

예제

 위에 언급한 스마트 폰 생산 과정을 AAL 을 이용하여 구현해보았습니다. 예제가 비교적 긴 편이지만 최대한 이해하기 쉽게 직관적인 코드를 작성했습니다. AAL 을 중점적으로 소개하기 위해 디자인이나 테크닉은 무시하고 작성하였습니다.

 전체적인 흐름을 이해하는 데에는 어려움이 없을 것 같으나 사용된 함수들이 어떤 역할을 하는지 궁금할 것입니다.

시나리오

  1.  스마트 폰 케이스 생산자와 하드웨어 생산자가 각각 케이스와 하드웨어를 부품으로 생산한다.
  2.  생산된 부품을 조립하는 객체에게 전달하면 조립하는 객체가 조립하게 된다.
  3.  조립된 스마트 폰은 소프트웨어 설치 객체에게 전달되고, 그 객체는 소프트웨어를 설치한다.
  4.  소프트웨어가 설치된 스마트 폰은 테스터에게 전달되고, 테스터는 테스트에 성공한 스마트 폰을 제품 컨테이너에 저장한다.
  5.  제품 컨테이너의 제품들을 출력하고 종료한다.

[ 그림2. 예제 시나리오 ]

[ 그림2. 예제 시나리오 ]


코드

// 스마트폰 생산 예제 코드
#include <iostream>
#include <agents.h>
#include <ppl.h>

using namespace std;
using namespace Concurrency;

// 케이스 클래스
class Case
{
};

// 하드웨어 클래스
class Hardware
{
};

// 소프트웨어 클래스
class Software
{	
};

// 스마트폰 클래스
class SmartPhone
{
public:
	int			uid;

	Case*		pCase;
	Hardware*	pHardware;
	Software*	pSoftware;

	~SmartPhone()
	{
		if( 0 != this->pCase )
		{
			delete this->pCase;
			this->pCase = 0;
		}

		if( 0 != this->pHardware )
		{
			delete this->pHardware;
			this->pCase = 0;
		}

		if( 0 != this->pSoftware )
		{
			delete this->pSoftware;
			this->pSoftware = 0;
		}		
	}

	SmartPhone( int _uid, Case* _pCase = 0, Hardware* _pHardware = 0, Software* _pSoftware = 0 )
		: uid( _uid )
		, pCase( _pCase )
		, pHardware( _pHardware )
		, pSoftware( 0 )
	{
		if( 0 != _pSoftware )
			this->pSoftware = new Software( *_pSoftware );		
	}		

	void SetSoftware( Software* _pSoftware )
	{
		if( 0 != this->pSoftware )
			delete this->pSoftware;

		this->pSoftware = new Software( *_pSoftware );
	}
};

// 메시지 버퍼 typedef
typedef unbounded_buffer< Case* >		CaseBuffer;
typedef unbounded_buffer< Hardware* >	HardwareBuffer;
typedef unbounded_buffer< SmartPhone* >	SmartPhoneBuffer;

typedef vector< SmartPhone* >			SmartPhoneVector;

// 케이스 생산자 agent
class CaseProducer
	: public agent
{
private:
	unsigned int	caseCountToCreate;
	CaseBuffer&		caseBuffer;	

protected:
	void run()
	{
		// 필요한 개수 만큼 케이스를 생산하고 케이스 버퍼로 보낸다.
		for( unsigned int i = 0; i < this->caseCountToCreate; ++i )
		{
			send( this->caseBuffer, new Case );
			wcout << i << L": created a case." << endl;
		}

		// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
		send( this->caseBuffer, static_cast< Case* >( 0 ) );

		done();
	}

public:	
	CaseProducer( unsigned int _caseCountToCreate, CaseBuffer& _caseBuffer )
		: caseBuffer( _caseBuffer )
		, caseCountToCreate( _caseCountToCreate ) { }	
};

// 하드웨어 생산자 agent
class HardwareProducer
	: public agent
{	
private:
	unsigned int		hardwareCountToCreate;
	HardwareBuffer&		hardwareBuffer;

protected:
	void run()
	{
		// 필요한 개수 만큼 하드웨어를 생산하고 하드웨어 버퍼로 보낸다.
		for( unsigned int i = 0; i < this->hardwareCountToCreate; ++i )
		{
			send( this->hardwareBuffer, new Hardware );
			wcout << i << L": created a hardware." << endl;
		}

		// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
		send( this->hardwareBuffer, static_cast< Hardware* >( 0 ) );

		done();		
	}

public:
	HardwareProducer( unsigned int _hardwareCountToCreate, HardwareBuffer& _hardwareBuffer )
		: hardwareCountToCreate( _hardwareCountToCreate )
		, hardwareBuffer( _hardwareBuffer ) { }
};

// 부품( 케이스와 하드웨어)을 조립하는 agent
class Assembler
	: public agent
{
private:
	CaseBuffer&			caseBuffer;
	HardwareBuffer&		hardwareBuffer;

	SmartPhoneBuffer&	incompletedSmartPhoneBuffer;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 버퍼에 쌓인 부품( 케이스와 하드웨어 )을 꺼내 조립하여 스마트폰을 만든다.
		while( true )
		{
			Case* pCase = receive( this->caseBuffer );
			Hardware* pHardware = receive( this->hardwareBuffer );

			// 더 이상 조립할 부품( 케이스 또는 하드웨어 )들이 없으면, 
			// 스마트폰 생산이 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
			if( 0 == pCase || 0 == pHardware )
			{
				// 남은 케이스를 파괴한다.
				if( 0 != pCase )
				{
					while( true )
					{
						Case* pGarbageCase = receive( this->caseBuffer );

						if( 0 == pGarbageCase )
							break;

						delete pGarbageCase;
					}					
				}

				// 남은 하드웨어를 파괴한다.
				if( 0 != pHardware )
				{
					while( true )
					{
						Hardware* pGarbageHardware = receive( this->hardwareBuffer );

						if( 0 == pGarbageHardware )
							break;

						delete pGarbageHardware;
					}
				}

				send( this->incompletedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
				break;
			}

			send( this->incompletedSmartPhoneBuffer,
				new SmartPhone( loopCount + 1, pCase, pHardware ) );

			wcout << loopCount << L": created a smart phone." << endl;

			++loopCount;
		}
		
		done();
	}

public:
	Assembler( CaseBuffer& _caseBuffer, HardwareBuffer& _hardwareBuffer, 
		SmartPhoneBuffer& _incompletedSmartPhoneBuffer )
		: caseBuffer( _caseBuffer )
		, hardwareBuffer( _hardwareBuffer )
		, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer ) { }	
};

// 소프트웨어 설치 agent
class SoftwareInstaller
	: public agent
{	
private:
	Software&			software;	

	SmartPhoneBuffer&	incompletedSmartPhoneBuffer;
	SmartPhoneBuffer&	completedSmartPhoneBuffer;

	SmartPhoneVector&	faultySmartPhoneArray;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 조립된 스마트폰에 소프트웨어를 설치한다.
		while( true )
		{
			SmartPhone* pSmartPhone = receive( this->incompletedSmartPhoneBuffer );

			// 더 이상 소프트웨어를 설치할 조립된 스마트폰이 없으면, 설치를 중단하고
			// 소프트웨어 설치도 모두 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
			if( 0 == pSmartPhone )
			{
				send( this->completedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
				break;
			}

			wcout << loopCount;

			// 제대로 조립되었는지 판단 후, 소프트웨어를 설치한다. 
			if( 0 != pSmartPhone->pCase && 0 != pSmartPhone->pHardware )
			{
				pSmartPhone->SetSoftware( &this->software );

				send( this->completedSmartPhoneBuffer, pSmartPhone );
				wcout << L": installed the software." << endl;
			}
			else
			{
				this->faultySmartPhoneArray.push_back( pSmartPhone );
				wcout << L": failed to install the software." << endl;
			}

			++loopCount;
		}		

		done();
	}

public:
	SoftwareInstaller( Software& _software, SmartPhoneBuffer& _incompletedSmartPhoneBuffer,
		SmartPhoneBuffer& _completedSmartPhoneBuffer, SmartPhoneVector& _faultySmartPhoneArray )
		: software( _software )
		, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer )
		, completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
		, faultySmartPhoneArray( _faultySmartPhoneArray ) { }
};

// 테스트하는 agent
class Tester
	: public agent
{
private:
	SmartPhoneBuffer&	completedSmartPhoneBuffer;

	SmartPhoneVector&	productArray;
	SmartPhoneVector&	faultySmartPhoneArray;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 조립된 스마트폰에 소프트웨어 설치가 완료된 스마트폰을 테스트한다.
		while( true )
		{
			SmartPhone* pSmartPhone = receive( this->completedSmartPhoneBuffer );

			// 더 이상 테스트할 스마트폰이 없으면 중단한다.
			if( 0 == pSmartPhone )				
				break;

			wcout << loopCount;

			if( this->Test( pSmartPhone ) )
			{
				this->productArray.push_back( pSmartPhone );
				wcout << L": succeeded in testing." << endl;
			}
			else
			{
				this->faultySmartPhoneArray.push_back( pSmartPhone );
				wcout << L": failed to test." << endl;
			}

			++loopCount;
		}
		
		done();
	}

public:
	Tester( SmartPhoneBuffer& _completedSmartPhoneBuffer,
		SmartPhoneVector& _productArray, SmartPhoneVector& _faultySmartPhoneArray )
		: completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
		, productArray( _productArray )
		, faultySmartPhoneArray( _faultySmartPhoneArray ) { }

	bool Test( SmartPhone* _pSmartPhone )
	{
		return ( 0 != _pSmartPhone->pCase && 0 != _pSmartPhone->pHardware
			&& 0 != _pSmartPhone->pSoftware );		
	}	
};

int main()
{
	// 케이스 생산자 agent 생성.
	int caseCountToCreate = 10;
	CaseBuffer caseBuffer;
	CaseProducer caseProducer( caseCountToCreate, caseBuffer );

	// 하드웨어 생산자 agent 생성.
	int hardwareCountToCreate = 10;
	HardwareBuffer hardwareBuffer;
	HardwareProducer hardwareProducer( hardwareCountToCreate, hardwareBuffer );

	// 부품( 케이스와 하드웨어 )을 조립하는 agent 생성.
	SmartPhoneBuffer	incompletedSmartPhoneBuffer;
	Assembler assembler( caseBuffer, hardwareBuffer, incompletedSmartPhoneBuffer );	
	
	// 소프트웨어 설치 agent 생성.
	SmartPhoneVector	faultySmartPhoneArray;

	SmartPhoneBuffer	completedSmartPhoneBuffer;
	SmartPhoneBuffer	faultySmartPhoneBuffer;
	Software android;
	SoftwareInstaller softwareInstaller( android, incompletedSmartPhoneBuffer,
		completedSmartPhoneBuffer, faultySmartPhoneArray );

	// 테스트하는 agent 생성.
	SmartPhoneVector	productArray;

	SmartPhoneBuffer	productBuffer;	
	Tester tester( completedSmartPhoneBuffer, productArray, faultySmartPhoneArray );

	// 모든 생성된 agent 작업 시작.
	caseProducer.start();
	hardwareProducer.start();
	assembler.start();
	softwareInstaller.start();
	tester.start();

	// 모든 agent 의 작업이 끝날 때까지 대기.
	agent* watingAgents[] = { &caseProducer, &hardwareProducer, &assembler,
		&softwareInstaller, &tester };

	agent::wait_for_all( sizeof( watingAgents ) / sizeof( agent* ), watingAgents );
	agent::wait( &caseProducer );
	agent::wait( &hardwareProducer );
	agent::wait( &assembler );
	agent::wait( &softwareInstaller );
	agent::wait( &tester );	

	// 완성된 제품 출력.
	wcout << L"completed products: " << endl;

	parallel_for_each( productArray.begin(), productArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		wcout << L"product uid: " << _pSmartPhone->uid << endl;
	});

	// 자원 정리 - 완제품.
	parallel_for_each( productArray.begin(), productArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		if( 0 != _pSmartPhone )
			delete _pSmartPhone;
	});	

	// 자원 정리 - 불량품.
	parallel_for_each( faultySmartPhoneArray.begin(), faultySmartPhoneArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		if( 0 != _pSmartPhone )
			delete _pSmartPhone;
	});
}

[ 코드1. 예제 코드 ]

실행 결과

 모든 agent 가 수행하는 작업이 비 동기적으로 호출되었고 결과를 기다립니다. 비 동기적으로 호출된 agent 의 작업들은 각각 독립적인 스레드에서 동시에 수행되며, 작업을 수행하는데 데이터가 필요하다면 메시지에 의해 필요한 데이터가 도착할 때까지 동기화되어 수행됩니다.

 wcout 객체는 동기화되지 않기 때문에 작업의 넘버링이 엉켜있는데 실제로 세어보면 정확히 맞아 떨어집니다.

[ 그림3. 코드1 실행 결과 ]

[ 그림3. 코드1 실행 결과 ]

메모리 누수

코드에 명시적인 메모리 누수가 없어도 Concurrency Runtime 에 의한 메모리 누수가 발견됩니다. 이것은 개발팀에서도 버그라고 인정하고 있습니다. 하지만 이번 2010 버전에는 수정되지 않을 것이고, 다음 버전인 서비스팩에 수정되어 포함될 것이라고 합니다.

참고: http://vsts2010.net/263

AAL 의 편리함

 위의 예제가 보여주다시피 AAL 을 사용하면 정말 간단하게 여러 객체의 작업을 비 동기적으로 동시에 수행시킬 수 있습니다.

만약 직접 Win32 API 를 사용하여 이를 구현하려 한다면 각 데이터를 보관하는 컨테이너에 대한 동기화 등과 같은 복잡한 메카니즘을 이벤트 등과 같은 동기화 객체를 사용하여 직접 구현해야 합니다.

물론, 결국은 OS의 커널( kernel ) 객체를 사용하여 구현되겠지만, 빠른 생산성의 이익과 동기화로 인한 스트레스로 인해 피폐해져 가는 인간성을 지킬 수 있다는 측면에서는 굉장히 편리한 도구임에 틀림없습니다.

 

마치는 글

 이번 글은 AAL 에 대한 개념과 어떻게 동작하는지 간단한 소개입니다. 다음 글에서 구현에 필요한 함수들과 사용법들을 이야기해 보겠습니다.

 

참고

  • 그림1. Concurrency Runtime Architecture - http://i.msdn.microsoft.com/dynimg/IC315446.png

안부 게시판에 Gerndal님이 아래의 코드를 실행하면 메모리 leak이 난다고 알려 주셨습니다.

 

#include "stdafx.h"

#include <ppl.h>

using namespace Concurrency;

 

int main()

{

_CrtSetDbgFlag( _CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF );

 

parallel_invoke( [] { }, [] { } );

return 0;

}

 

코드를 보면 메모리 leak이 날 조건이 하나도 없습니다. 그런데 왜 메모리 leak이 날까요?

영문판 MSDN 커뮤니티에 가보면 같은 문제로 질문하고 있는 것을 찾을 수 있습니다.

 

메모리 leak이 감지되는 것은 정말 메모리 leak이 발생한 것은 아닙니다. 문제는 ConcRT의 스케줄러가 해제되기 전에 프로그램이 먼저 종료되기 때문에 발생하는 것입니다.

 

위 코드를 보면 스케줄러와 관련된 코드는 하나도 보이지 않지만 암묵적으로 사용하고 있습니다.

이 문제를 해결하기 위해서는 스케줄러를 시작 부분에서 명시적으로 정의하고 프로그램이 종료하기 전에 스케줄러를 명시적으로 해제하는 것으로 해결할 수 있습니다( 혹은 정말 메모리 leak이 아니니 그냥 무시해도 됩니다 ).

 

 

메모리 leak 경고를 발생시키지 않기 위해서는 아래와 같이 하면 됩니다.

 

int main()

{

    HANDLE hEvent = CreateEvent( NULL, TRUE, FALSE, NULL );

    CurrentScheduler::Create( SchedulerPolicy() );

    CurrentScheduler::RegisterShutdownEvent( hEvent );

 

 

    _CrtSetDbgFlag( _CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF );

 

    parallel_invoke( [] {}, [] {} );

 

 

    CurrentScheduler::Detach();

    WaitForSingleObject( hEvent, INFINITE );

    CloseHandle( hEvent );

    Sleep(500);

 

    return 0;

}

 

암묵적으로 스케줄러를 정의했다면 프로그램이 종료될 때 깔끔하게 해제되어야 하는데 이 부분이 매끄럽지 못한 것이 아쉽습니다. 현재 관련 개발자는 이번 버전에서는 깔끔하게 처리하는 부분을 미처 넣지 못했지만 꼭 다음 버전(VC++ 11)에서는 꼭 해결하겠다고 이야기 합니다.

 

 

ps : 이것은 415일 세션에서 제가 언급하였습니다만 블로그에는 포스팅을 늦게 하게 되었습니다.

Parallel Patterns Library(PPL) - concurrent_queue - 2

VC++ 10 Concurrency Runtime 2010. 1. 16. 09:00 Posted by 알 수 없는 사용자

concurrent_queue는 사용 용도가 concurrent_vector 보다 더 많을 것 같아서 좀 더 자세하게 설명하겠습니다.

 

온라인 서버 애플리케이션의 경우 Producer-Consumer 모델이나 이와 비슷한 모델로 네트웍을 통해서 받은 패킷을 처리합니다. 즉 스레드 A는 네트웍을 통해서 패킷을 받으면 Queue에 넣습니다. 그리고 스레드 B Queue에서 패킷을 꺼내와서 처리합니다. 이 때 Queue는 스레드 A B가 같이 사용하므로 공유 객체입니다. 공유 객체이므로 패킷을 넣고 뺄 때 크리티컬섹션과 같은 동기 객체로 동기화를 해야 합니다. 이런 곳에 concurrent_queue를 사용하면 아주 좋습니다.

 

 

concurrent_queue를 사용하기 위한 준비 단계

 

너무 당연하듯이 헤더 파일과 네임스페이스를 선언해야 합니다.

 

헤더파일

#include <concurrent_queue.h>

 

네임스페이스

using namespace Concurrency;

을 선언합니다.

 

이제 사전 준비는 끝났습니다. concurrent_queue를 선언한 후 사용하면 됩니다.

concurrent_queue< int > queue1;

 

 


concurrent_queue에 데이터 추가

 

concurrent_queue에 새로운 데이터를 넣을 때는 push 라는 멤버를 사용합니다.

 

원형

void push( const _Ty& _Src );

 

STL deque push_back과 같은 사용 방법과 기능도 같습니다. 다만 스레스 세이프 하다는 것이 다릅니다. concurrent_queue는 앞 회에서 이야기 했듯이 스레드 세이프한 컨테이너이므로 제약이 있습니다. 그래서 deque 와 다르게 제일 뒤로만 새로운 데이터를 넣을 수 있습니다.

 

concurrent_queue< int > queue1;

queue1.push( 11 );

 

 

 

concurrent_queue에서 데이터 가져오기

 

데이터를 가져올 때는 try_pop 멤버를 사용합니다. 앞의 push의 경우는 STL deque와 비슷했지만 try_pop은 꽤 다릅니다.

 

원형

bool try_pop( _Ty& _Dest );

 

try_pop을 호출 했을 때 concurrent_queue에 데이터가 있다면 true를 반환하고 _Dest에 데이터가 담기며 concurrent_queue에 있는 해당 데이터는 삭제됩니다. 그러나 concurrent_queue에 데이터가 없다면 false를 즉시 반환하고 _Dest에는 호출했을 때의 그대로 됩니다.

 

concurrent_queue< int > queue1;

 

queue1.push( 12 );

queue1.push( 14 );

 

int Value = 0;

 

if( queue1.try_pop( Value ) )

{

           // queue1에서 데이터를 가져왔음

}

else

{

           // queue1은 비어 있었음.

}

 

 

 

concurrent_queue가 비어 있는지 검사

 

concurrent_queue가 비어 있는지 알고 싶을 때는 empty()를 사용합니다. 이것은 STL deque와 같습니다.

 

원형

bool empty() const;

 

비어 있을 때는 true를 반환하고 비어 있지 않을 때는 false를 반환합니다. 다만 empty를 호출할 때 비어 있는지 검사하므로 100% 정확하지 않습니다. 100% 정확하지 않다라는 것은 empty push, try_pop 이 셋은 스레드 세이프하여 동시에 사용될 수 있으므로 empty를 호출할 시점에는 데이터가 있어서 false를 반환했지만 바로 직후에 다른 스레드에서 try_pop으로 삭제를 해버렸다면 empty 호출 후 false를 반환했어 try_pop을 호출했는데 false가 반환 될 수 있습니다.

 

 

 

concurrent_queue에 있는 데이터의 개수를 알고 싶을 때

 

concurrent_queue에 있는 데이터의 개수를 알고 싶을 때는 unsafe_size 멤버를 사용합니다.

 

원형

size_type unsafe_size() const;

 

이것은 이름에서도 알 수 있듯이 스레드 세이프 하지 않습니다. 그래서 unsafe_size를 호출할 때 push try_pop이 호출되면 unsafe_size를 통해서 얻은 결과는 올바르지 않습니다.

 

 


concurrent_queue에 있는 데이터 순차 접근

 

concurrent_queue에 있는 데이터를 모두 순차적으로 접근하고 싶을 때는 unsafe_begin unsafe_end를 사용합니다.

 

원형

iterator unsafe_begin();

const_iterator unsafe_begin() const;

 

iterator unsafe_end();

const_iterator unsafe_end() const;

 

unsafe_begin을 사용하여 선두 위치를 얻고, unsafe_end를 사용하여 마지막 다음 위치(미 사용 영역)를 얻을 수 있습니다. 이것도 이름에 나와 있듯이 스레드 세이프 하지 않습니다.

 

 

 

모든 데이터 삭제


모든 데이터를 삭제할 때는 clear를 사용합니다. 이것은 이름에 unsafe라는 것이 없지만 스레드 세이프 하지 않습니다.

 

원형

template< typename _Ty, class _Ax >

void concurrent_queue<_Ty,_Ax>::clear();

 

 

 

제 글을 보는 분들은 C++을 알고 있다는 가정하고 있기 때문에 STL을 알고 있다고 생각하여 아주 간단하게 concurrent_queue를 설명 하였습니다.

 

concurrent_queue 정말 간단하지 않습니까? 전체적으로 STL deque와 비슷해서 어렵지 않을 것입니다. 다만 스레드 세이프 하지 않은 것들이 있기 때문에 이것들을 사용할 때는 조심해야 된다는 것만 유의하면 됩니다.

 

이것으로 Concurrency Runtime PPL에 대한 설명은 일단락 되었습니다.

이후에는 Concurrency Runtime의 다른 부분을 설명할지 아니면 Beta2에서 새로 추가된 C++0x의 기능이나 또는 이전에 설명한 것들을 더 깊게 설명할지 고민을 한 후 다시 찾아 뵙겠습니다.^^

 

 


참고

Producer-Consumer 모델 : 자바워크님의 http://javawork.egloos.com/2397148

MSDN concurrent_queue :

http://msdn.microsoft.com/en-us/library/dd504906(VS.100).aspx#queue

 

Parallel Patterns Library(PPL) - concurrent_queue - 1

VC++ 10 Concurrency Runtime 2009. 12. 18. 09:00 Posted by 알 수 없는 사용자

concurrent_queuequeue 자료구조와 같이 앞과 뒤에서 접근할 수 있습니다.

concurrent_queue는 스레드 세이프하게 enqueue와 dequeue(queue에 데이터를 넣고 빼는) 조작을 할 수 있습니다.

또 concurrent_queue반복자를 지원하지만 이것은 스레드 세이프 하지 않습니다.

 



concurrent_queuequeue의 차이점


concurrent_queuequeue는 서로 아주 비슷하지만 다음과 같은 다른 점이 있습니다.

( 정확하게는 concurrent_queue와 STL의 deque와의 차이점 이라고 할수 있습니다. )


- concurrent_queue enqueue dequeue 조작이 스레드 세이프 하다.


- concurrent_queue는 반복자를 지원하지만 이것은 스레드 세이프 하지 않다.


- concurrent_queue front pop 함수를 지원하지 않는다.

  대신에 try_pop 함수를 대신해서 사용한다.


- concurrent_queue back 함수를 지원하지 않는다.

  그러므로 마지막 요소를 참조하는 것은 불가능하다.


- concurrent_queue size 메소드 대신 unsafe_size 함수를 지원한다.

  unsafe_size는 이름 그대로 스레드 세이프 하지 않다.


 

 

스레드 세이프한 concurrent_queue의 함수


concurrent_queue에 enqueue 또는 dequeue 하는 모든 조작에 대해서는 스레드 세이프합니다.

 

- empty

- push

- get_allocator

- try_pop

 

empty는 스레드 세이프하지만 empty 호출 후 반환되기 전에 다른 스레드에 의해서 queue가 작아지던가 커지는 경우 이 동작들이 끝난 후에 empty의 결과가 반환됩니다.

 



스레드 세이프 하지 않은 concurrent_queue의 함수

 

- clear

- unsafe_end

- unsafe_begin

- unsafe_size

 

 


반복자 지원

 

앞서 이야기 했듯이 concurrent_queue는 반복자를 지원하지만 이것은 스레드 세이프 하지 않습니다. 그래서 이것은 디버깅 할 때만 사용할 것을 추천합니다.

또 concurrent_queue의 반복자는 오직 앞으로만 순회할 수 있습니다.


concurrent_queue는 아래의 반복자를 지원합니다.

 

- operator++

- operator*

- operator->

 

 

concurrent_queue는 앞서 설명한 concurrent_vector와 같이 스레드 세이프한 컨테이너지만 STL vector deque에는 없는 제약 사항도 있습니다. 우리들이 Vector deque를 스레드 세이프하게 래핑하는 것보다는 Concurrency Runtime에서 제공하는 컨테이너가 성능적으로 더 좋지만 모든 동작이 스레드 세이프하지 않고 지원하지 않는 것도 있으니 조심해서 사용해야 합니다.

 

 

다음에는 일반적인 queue에는 없고 concurrent_queue에서만 새로 생긴 함수에 대해서 좀 더 자세하게 설명하겠습니다.


ps : 앞 주에 Intel의 TBB에 대한 책을 보았습니다. 전체적으로 Concurrency Runtime과 비슷한 부분이 많아서 책을 생각 외로 빨리 볼 수 있었습니다. 제 생각에 TBB나
Concurrency Runtime를 공부하면 다른 하나도 아주 빠르고 쉽게 습득할 수 있을 것 같습니다.

Parallel Patterns Library(PPL) - concurrent_vector - 2

VC++ 10 Concurrency Runtime 2009. 12. 9. 09:00 Posted by 알 수 없는 사용자


concurrent_vector의 주요 멤버

 

자주 사용하는 것들과 STL vector에 없는 것들을 중심으로 추려 보았습니다.

멤버

스레드 세이프

 

at

O

 

begin

O

 

back

O

 

capacity

O

 

empty

O

 

end

O

 

front

O

 

grow_by

O

new

grow_to_at_least

O

new

max_size

O

 

operator[]

O

 

push_back

O

 

rbegin

O

 

rend

O

 

size

O

 

assign

X

 

clear

X

 

reserve

X

 

resize

X

 

shink_to_fit

X

new

 

concurrent_vector는 기존 요소의 값을 변경할 때는 스레드 세이프하지 않습니다. 기존 요소의 값을 변경할 때는 동기화 객체를 사용하여 lock을 걸어야 합니다.

 

 

concurrent_vector 사용 방법

 

concurrent_vector를 사용하기 위해서 먼저 헤더 파일을 포함해야 합니다.

concurrent_vector의 헤더 파일은 “concurrent_vector.h” 입니다.

 

concurrent_vector의 사용 방법은 STL vector를 사용하는 방법과 거의 같습니다. 그러니 STL vector에 없는 것들만 제외하고는 vector를 사용하는 방법을 아는 분들은 따로 공부해야 할 것이 거의 없습니다.

STL vector에 대해서 잘 모르시는 분들은 About STL : C++ STL 프로그래밍(4)-벡터 글을 참고해 주세요.

 


 

concurrent_vector 초 간단 사용 예


concurrent_vector를 사용한 아주 아주 간단한 예제입니다.^^

 

#include <ppl.h>

#include <concurrent_vector.h>

#include <iostream>

 

using namespace Concurrency;

using namespace std;

 

 

int main()

{

           concurrent_vector< int > v1;

           v1.push_back( 11 );

           return 0;

}

 

 


STL vector에는 없는 grow_by, grow_to_at_least 사용 법

 

grow_by vector의 크기를 확장해 줍니다.

예를 들어 현재 vector의 크기가(size()에 의한) 10인데 이것을 20으로 키우고 싶을 때 사용합니다.

 

원형은 아래와 같습니다.

iterator grow_by( size_type _Delta );

iterator grow_by( size_type _Delta, const_reference _Item );

 

grow_to_at_least는 현재 vector의 크기가 10인데 이것이 20보다 작을 때만 20으로 증가시키고 싶을 때 사용합니다.

원형은 아래와 같습니다.

iterator grow_to_at_least( size_type _N );

 

grow_bygrow_to_at_least의 반환 값은 추가된 처음 요소의 위치가 반복자입니다.

 

grow_by의 예제 코드입니다.

void Append ( concurrent_vector<char>& vector, const char* string) {

    size_t n = strlen(string) + 1;

    memcpy( &vector[vector_grow_by(n)], string, n+1 );

}

위 예제는 http://japan.internet.com/developer/20070306/27.html 에서 참고했습니다.

 

 


shink_to_fit


shink_to_fit는 메모리 사용량과 단편화를 최적화 시켜줍니다. 이것은 메모리 재할당을 하기 때문에 요소에 접근하는 모든 반복자가 무효화됩니다.


 

Intel TBB


CPU로 유명한 Intel에서는 멀티코어 CPU를 만들면서 병렬 프로그래밍을 좀 더 쉽고, 안전화고, 확장성 높은 프로그램을 만들 수 있도록 툴과 라이브러리를 만들었습니다.

라이브러리 중 TBB라는 병렬 프로그래밍 용 라이브러리가 있습니다. 아마 TBB를 아시는 분이라면 Concurrent Runtime PPL에 있는 것들이 TBB에 있는 것들과 비슷한 부분이 많다라는 것을 아실 것입니다.

VSTS 2010 Beta2가 나온지 얼마 되지 않아서 병렬 컨테이너에 대한 문서가 거의 없습니다. 그러나 TBB에 관한 문서는 검색을 해보면 적지 않게 찾을 수 있습니다. concurrent_vector에 대해서 좀 더 알고 싶은 분들은 Intel TBB에 대해서 알아보시면 좋을 것 같습니다.

( 참고로 TBB 관련 서적이 한국어로 근래에 출간되었습니다.  http://kangcom.com/sub/view.asp?sku=200911100001 )

 


다음에는 concurrent_queue에 대해서 알아 보겠습니다.

Parallel Patterns Library(PPL) - concurrent_vector - 1

VC++ 10 Concurrency Runtime 2009. 11. 29. 08:30 Posted by 알 수 없는 사용자

Visual Stuido 2010 Beta2가 나오면서 제가 기대하고 있었던 병렬 컨테이너가 드디어 구현되었습니다.

 

Concurrency Runtime(이하 ConRT)에는 총 3개의 병렬 컨테이너를 제공합니다. Beta2에서는 모두 다 구현되지는 못하고 concurrent_vector concurrent_queue 두 개가 구현되었습니다. 아직 구현되지 않은 것은 concurrent_hash_map 입니다.

 

세 개의 컨테이너들은 C++ STL의 컨테이너 중에서 가장 자주 사용하는 것으로 vector, deque, hash_map 컨테이너의 병렬 버전이라는 것을 이름을 보면 쉽게 알 수 있을 것입니다.

 

STL에 있는 컨테이너와 비슷한 이름을 가진 것처럼 사용 방법도 기존의 컨테이너와 비슷합니다. 다만 병렬 컨테이너 답게 스레드 세이프하며, 기존의 컨테이너에서 제공하는 일부 기능을 지원하지 못하는 제한도 있습니다.

 

 

몇 회에 나누어서 concurrent_vector concurrent_queue에 대해서 설명해 나가겠습니다.

이번에는 첫 번째로 concurrent_vector에 대한 것입니다.

 

 

 

concurrent_vector란?

 

STL vector와 같이 임의 접근이 가능한 시퀀스 컨테이너입니다. concurrent_vector는 멀티 스레드에서 요소를 추가하던가 특정 요소에 접근해도 안전합니다. 반복자의 접근과 순회는 언제나 멀티 스레드에서 안전해야 하므로 요소를 추가할 때는 기존의 인덱스와 반복자를 무효화 시키면 안됩니다.

 

 

concurrent_vector vector의 차이점


기능

vctor

Concurrent_vector

추가

스레드에 안전하지 않음

스레드에 안전

요소에 접근

스레드에 안전하지 않음

스레드에 안전

반복자 접근 및 순회

스레드에 안전하지 않음

스레드에 안전

push_back

가능

가능

insert

가능

불가능

clear

모두 삭제

모두 삭제

erase

가능

불가능

pop_back

가능

불가능

배열식 접근 예. &v[0]+2

가능

불가능

 

 

grow_by, grow_to_at_least (vector resize와 비슷)는 스레드에 안전

 

 

추가 또는 resize 때 기존 인덱스나 반복자의 위치가 바뀌지 않음

 

 

bool 형은 정의 되지 않았음

 


concurrent_vector에 대한 설명을 이번에는 소개 정도로 끝내고 다음부터는 본격적으로 Concurrent_vector을 어떻게 사용하면 되는지 상세하게 설명해 나가겠습니다.^^


task group에서의 병렬 작업 취소 - 1  에 이은 두 번째 글입니다.


3. 작업이 취소되었을 때 해야 할 것


취소는 그것을 호출했을 때 즉시 동작하지 않습니다. task group이 취소되면 런타임은 각 task interruption point를 발동하여 런타임을 throw 시켜서 활동중인 task가 취소될 때 내부 예외 형을 잡을 수 있습니다. Concurrency Runtime은 런타임이 언제 interruption point를 호출할지 정의되어 있지 않습니다. 런타임이 취소를 할 때 던지는 예외를 잡아서 처리할 필요가 있습니다.

그래서 만약 task의 처리 시간이 긴 경우는 정기적으로 취소 여부를 확인할 필요가 있습니다.

 

< 리스트 4. >

auto t5 = make_task([&] {

   // Perform work in a loop.

   for (int i = 0; i < 1000; ++i)

   {

      // To reduce overhead, occasionally check for

      // cancelation.

      if ((i%100) == 0)

      {

         if (tg2.is_canceling())

         {

            wcout << L"The task was canceled." << endl;

            break;

         }

      }

 

      // TODO: Perform work here.

   }

});

 

<리스트 4>의 굵게 표시한 코드는 task5가 정기적으로 task group tg2가 취소 되었는지 조사하고 있는 것입니다.

 

<리스트 4>는 명시적으로 task5가 속한 task group tg2가 취소되었는지 조사하고 있는데 만약 해당 task가 속한 task group을 직접적으로 호출하지 않고 취소 여부를 조사하고 싶을 때는 is_current_task_group_canceling() 을 사용합니다.

 

 

4. 병렬 알고리즘에서의 취소

 

task group에서 사용하는 병렬 알고리즘도 위에서 소개한 방법으로 취소할 수 있습니다.

 

< 리스트 5. Task group에서 병렬 알고리즘 사용 >

structured_task_group tg;

 

task_group_status status = tg.run_and_wait([&] {

   parallel_for(0, 100, [&](int i) {

      // Cancel the task when i is 50.

      if (i == 50)

      {

         tg.cancel();

      }

      else

      {

         // TODO: Perform work here.

      }

   });

});

 

// Print the task group status.

wcout << L"The task group status is: ";

switch (status)

{

case not_complete:

   wcout << L"not complete." << endl;

   break;

case completed:

   wcout << L"completed." << endl;

   break;

case canceled:

   wcout << L"canceled." << endl;

   break;

default:

   wcout << L"unknown." << endl;

   break;

}

 

<리스트 5> task group tg task를 넣을 때 병렬 알고리즘을 넣었습니다. 그리고 이번 beta2에 새로 생긴 run_and_wait 멤버를 사용하여 task의 실행이 끝날 때 까지 대기하도록 했습니다(예전에는 run 이후에 wait를 호출해야 했습니다).


물론 cancel이 아닌 예외를 발생 시켜서 취소 시킬 수도 있습니다.


< 리스트 6. 병렬 알고리즘에서 예외를 발생시켜서 취소 시키기 >

try

{

   parallel_for(0, 100, [&](int i) {

      // Throw an exception to cancel the task when i is 50.

      if (i == 50)

      {

         throw i;

      }

      else

      {

         // TODO: Perform work here.

      }

   });

}

catch (int n)

{

   wcout << L"Caught " << n << endl;

}

 

<리스트 6>은 하나의 task만 예외를 발생시키고 있기 때문에 task group의 모든 task를 취소

시키기 위해서는 모든 task에서 예외를 발생시켜야 합니다.

그래서 아래의 <리스트 7>과 같이 전역 변수 flag를 사용합니다.

 

< 리스트 7. 모든 병렬 알고리즘의 task 취소 시키기 >

bool canceled = false;

 

parallel_for(0, 100, [&](int i) {

   // For illustration, set the flag to cancel the task when i is 50.

   if (i == 50)

   {

      canceled = true;

   }

 

   // Perform work if the task is not canceled.

   if (!canceled)

   {

      // TODO: Perform work here.

   }

});

 

 


5. Parallel 작업을 취소를 사용하지 못하는 경우

 

취소 작업은 모든 상황에서 다 사용할 수 있는 것은 아닙니다. 특정 시나리오에서는 사용하지 못할 수가 있습니다. 예를 들면 어떤 task는 활동중인 다른 task에 의해 block이 풀렸지만 아직 시작하기 전에 task group이 최소되어 버리면 계속 시작하지 못하여 결과적으로 애플리케이션이 dead lock 상황에 빠지게 됩니다.




이것으로 task group에서의 병렬 작업의 취소에 대한 것은 마칩니다. 다음에는 Beta2에 드디어 구현된 Concurrency Container에 대해서 설명하겠숩니다.


참고 url
MSDN : http://msdn.microsoft.com/en-us/library/dd984117(VS.100).aspx

task group을 사용하여 복수의 작업을 병렬적으로 처리할 때 모든 작업이 끝나기 전에 작업을 취소 해야 되는 경우가 있을 것입니다. task group에서 이와 같은 취소 처리를 어떻게 하는지 알아보겠습니다.

 

Concurrency Rumtime에 대한 정보는 아직까지는 MSDN을 통해서 주로 얻을 수 있기 때문에 거의 대부분 MSDN에 있는 것을 제가 좀 더 보기 좋고 쉽게 전달할 수 있도록 각색을 하는 정도이니 이미 MSDN에서 보신 분들은 pass 하셔도 괜찮습니다.^^;

 

 

1. 병렬 작업의 tree

 

PPL task group를 사용하여 병렬 작업을 세분화하여 각 작업을 처리합니다. task group에 다른 task group를 넣으면 이것을 부모와 자식으로 tree 구조로 표현할 수 있습니다.

 

< 리스트 1. >

structured_task_group tg1;

 

auto t1 = make_task([&] {

   structured_task_group tg2;

 

   // Create a child task.

   auto t4 = make_task([&] {

      // TODO: Perform work here.

   });

 

   // Create a child task.

   auto t5 = make_task([&] {

      // TODO: Perform work here.

   });

 

   // Run the child tasks and wait for them to finish.

   tg2.run(t4);

   tg2.run(t5);

   tg2.wait();

});

 

// Create a child task.

auto t2 = make_task([&] {

   // TODO: Perform work here.

});

 

// Create a child task.

auto t3 = make_task([&] {

   // TODO: Perform work here.

});

 

// Run the child tasks and wait for them to finish.

tg1.run(t1);

tg1.run(t2);

tg1.run(t3);

 

<리스트 1>에서는 structured_task_group tg2 tg1에 들어가서 tg2 tg1의 자식이 되었습니다. 이것을 tree 그림으로 표현하면 아래와 같습니다.



< 그림 1. >

 

 

2. 병렬 작업의 취소 방법

 

parallel task를 취소할 때는 task group의 cancel 멤버를 사용하면 됩니다(task_group::cancel, structured_task_group::cancel). 또 다른 방법으로는 task에서 예외를 발생시키는 것입니다. 두 가지 방법 중 cancel 멤버를 사용하는 것이 훨씬 더 효율적입니다.


cancel을 사용하는 것을 top-down 방식으로 task group에 속한 모든 task를 취소시킵니다. 예외를 발생 시켜서 취소하는 방법은 bottom-up 방식으로 task group에 있는 각 task에서 예외를 발생시켜서 위로 전파시킵니다.



2.1. cancel을 사용하여 병렬 작업 취소

 

cancel 멤버는 task group canceled 상태로 설정합니다. cancel 멤버를 호출한 이후부터는 task group task를 처리하지 않습니다. task가 취소되면 task group wait에서는 canceled를 반환합니다.

 

cancel 멤버는 자식 task에서만 영향을 끼칩니다. 예를 들면 <그림 1> t4에서 tg2를 cancel하면 tg2에 속한 t4, t5 task만 취소됩니다. 그러나 tg1을 cancel하면 모든 task가 취소됩니다.

 

structured_task_group은 thread 세이프 하지 않기 때문에 자식 task에서 cancel을 호출하면 어떤 행동을 할지 알 수 없습니다. 자식 task cancel로 부모 task를 취소하던가 is_canceling로 취소 여부를 조사할 수 있습니다.

 

< 리스트 2. cancel을 사용하여 취소 >

auto t4 = make_task([&] {

   // Perform work in a loop.

   for (int i = 0; i < 1000; ++i)

   {

      // Call a function to perform work.

      // If the work function fails, cancel all tasks in the tree.

      bool succeeded = work(i);

      if (!succeeded)

      {

         tg1.cancel();

         break;

      }

   }  

});

 


2.2. 예외를 발생시켜 병렬 작업 취소


앞서 cancel 멤버를 사용하는 것 이외에 예외를 발생시켜서 취소 시킬 수 있다고 했습니다. 그리고 이것은 cancel()을 사용하는 것보다 효율이 좋지 않다고 했습니다.

예외를 발생시켜서 취소하는 방법의 예는 아래의 <리스트 3>의 코드를 보시면 됩니다.

 

< 리스트 3. 예외를 발생시켜서 취소 >

structured_task_group tg2;

 

// Create a child task.     

auto t4 = make_task([&] {

   // Perform work in a loop.

   for (int i = 0; i < 1000; ++i)

   {

      // Call a function to perform work.

      // If the work function fails, throw an exception to

      // cancel the parent task.

      bool succeeded = work(i);

      if (!succeeded)

      {

         throw exception("The task failed");

      }

   }        

});

 

// Create a child task.

auto t5 = make_task([&] {

   // TODO: Perform work here.

});

 

// Run the child tasks.

tg2.run(t4);

tg2.run(t5);

 

// Wait for the tasks to finish. The runtime marshals any exception

// that occurs to the call to wait.

try

{

   tg2.wait();

}

catch (const exception& e)

{

   wcout << e.what() << endl;

}

 

task_group이 structured_task_group wait는 예외가 발생했을 때는 반환 값을 표시하지 못합니다. 그래서 <리스트 3>의 아래 부분에서 try-catch에서 exception을 통해서 상태를 표시하고 있습니다.




아직 이야기가 다 끝난 것이 아닙니다. 나머지는 다음 글을 통해서 설명하겠습니다.^^



참고 url

MSDN : http://msdn.microsoft.com/en-us/library/dd984117(VS.100).aspx


Parallel Patterns Library(PPL) - combinable

VC++ 10 Concurrency Runtime 2009. 10. 28. 08:30 Posted by 알 수 없는 사용자

PPL에서 제공하는 알고리즘을 사용하여 병렬로 작업을 실행할 때 각 작업에서 접근하는 공유 리소스는 스레드 세이프 하지 않기 때문에 lock을 걸어서 공유 리소스를 보호해야 합니다.

 

그러나 lock을 건다는 것은 번거롭기도 하며 성능에 좋지 않은 영향을 미칩니다.

가장 좋은 방법은 공유 리소스에 lock을 걸지 않아도 스레드 세이프한 것이 가장 좋습니다.

 

combinable은 바로 위에 언급한 문제를 해결해 주는 것입니다. 모든 상황에 다 사용할 수 있는 것은 아니지만 특정 상황에서는 combinable을 사용하면 lock을 걸지 않아도 공유 리소스를 스레드 세이프하게 접근 할 수 있습니다.

 

 

combinable

combinable은 병렬로 처리하는 작업에서 각 작업마다 계산을 실행한 후 그 계산 결과를 통합할 때 사용하는 재 사용 가능한 스레드 로컬 스트레지를 제공합니다.

 

combinable은 복수의 스레드 또는 태스크 간에 공유 리소스가 있는 경우에 사용하면 편리합니다. combinable는 공유 리소스의 접근을 각 스레드 별로 제공하여 공유 상태를 제거할 수 있습니다.

 


스레드 로컬 스트리지

스레드 프로그래밍을 공부하시면 스레드 고유의 로컬 스트리지를 만들어서 해당 스레드는 자신의 로컬 스트리지에 읽기,쓰기를 하여 다른 스레드와의 경합을 피하는 방법을 배울 수 있습니다.

combinable은 이 스레드 로컬 스트리지와 비슷한 방법입니다.

 

 

combinable의 메소드 설명

combinable::local : 현재 스레드 컨텍스트와 관련된 로컬 변수의 참조를 얻는다.

combinable::clear : 오브젝트로부터 모든 스레드 로컬 변수를 삭제한다.

combinable::combine : 제공하고 있는 있는 조합 함수를 사용하여 모드 스레드 로컬 계산의 set으로부터 최종적인 값을 만든다.

combinable::combinable_each ; 제공하고 있는 조합 함수를 사용하여 모든 스레드 로컬 계산의 set으로부터 최종적인 값을 만든다.

 

 

combinable은 최종 결합 결과 타입의 파라미터를 가지고 있는 템플릿 클래스입니다. 기본 생성자를 호출하면 기본 생성자와 복사 생성자 _Ty 템플릿 파라미터 형이 꼭 있어야합니다. _Ty 템플릿 파라미터 형이 기본 생성자를 가지지 않는 경우 파라미터로 초기화 함수를 사용하는 생성자로 오버로드 되어 있는 것을 호출합니다.

 

combinable을 사용하여 모든 작업에서 처리한 계산 결과 값을 얻을 때는 combine()을 사용하여 합계를 구하던가, combine_each를 사용하여 각 작업에서 계산한 값을 하나씩 호출하여 계산합니다.

 

< 예제 1. Combinable을 사용하지 않고 lock을 사용할 때 >

……

int TotalItemPrice1 = 0;

critical_section rt;

parallel_for( 1, 10000, [&]( int n ) {

                     rt.lock();

                     TotalItemPrice += n;

                     rt.unlock();

                     }         

);

………


<예제 1>critical_section을 사용하여 TotalItemPrice 변수를 보호하고 있습니다.

그럼 <예제 1> combunable을 사용하여 구현해 보겠습니다.

 

< 예제 2. Combinable 사용 >

#include <ppl.h>

#include <iostream>

 

using namespace Concurrency;

using namespace std;

 

 

int main()

{

           combinable<int> ItemPriceSum;

           parallel_for( 1, 10000, [&]( int n ) {

                                ItemPriceSum.local() += n;

                                }         

                     );

 

           int TotalItemPrice = ItemPriceSum.combine( [](int left, int right) {

                                          return left + right;}

                                );

 

           cout << "TotalItemPrice : " << TotalItemPrice << endl;

          

          

           getchar();

           return 0;

}

 

combinable을 사용하면 <예제 1>과 다르게 lock을 걸지 않아도 되기 때문에 훨씬 성능이 더 좋습니다. 다만 모든 곳에서 사용할 수는 없기 때문에 <예제 2>와 같이 어떤 계산의 최종 결과를 구할 때 등 사용할 수 있는 곳을 잘 찾아서 사용해야 합니다.

 

<예제 2>는 각 태스크에서 계산된 결과를 더하기 위해서 conbinablecombine 멤버를 사용했지만 각 태스크의 결과를 하나씩 순회할 때는 conbinablecombine _each 멤버를 사용합니다.

그리고 저는 <예제 2>에서 int combinable에 사용했지만 int 이외에 유저 정의형이나 STL list와 같은 컨테이너도 사용할 수 있습니다.

 


combinable에서 combine_each() 멤버나 combinable에서 STL list 컨테이너를 사용한 MSDN에 있는 예제는 아래와 같습니다.

#include <ppl.h>

#include <vector>

#include <list>

#include <algorithm>

#include <iostream>

 

using namespace std;

using namespace Concurrency;

 

int main()

{

   // Create a vector object that contains the values 1 through 10.

   vector<int> values(10);

  

   int n = 0;

   generate(values.begin(), values.end(), [&] { return ++n; } );

 

   // Generate the list of odd elements of the vector in parallel

   // by using the parallel_for_each algorithm and a combinable object.

   combinable<list<int>> odds;

   parallel_for_each(values.begin(), values.end(), [&](int n) {

         if (n % 2 == 1)

            odds.local().push_back(n);

       });

 

   // Combine all thread-local elements into the final result.

   list<int> result;

   odds.combine_each([&](list<int>& local) {

           // Merge the local list into the result so that the results

           // are in numerical order.

           local.sort(less<int>());

           result.merge(local, less<int>());

        });

 

   // Print the result.

   cout << "The odd elements of the vector are:";

   for_each(result.begin(), result.end(), [](int n) {

          cout << ' ' << n;

        });

}


이 글은 MSDN 글, "Solving The Dining Philosophers Problem With Asynchronous Agents"를 참고하여 작성되었습니다.

Asynchronous Agents Library로 Dining Philosophers 문제 해결하기 - 1
Asynchronous Agents Library로 Dining Philosophers 문제 해결하기 - 2

오래 기다리셨습니다; 그간 일이 바빠서;; 어쨌든 지난번에 Concurrecy::agent 에서 상속받은 Philosopher 클래스를 살펴봤었죠. 아래 두 함수만 제외하고 말입니다.

자 먼저 젓가락을 집는 함수입니다. 젓가락 한쌍을 동시에 집어야지 하나만이라도 먼저 집으려고 하다간 서로 젓가락 하나씩 잡고 기다리는 데드락 상황이 발생할 수 있습니다. 이를 위해 쓰이는 것이 지난 회에 잠깐 언급했든 join 메시지 블록입니다. 그 중에서도 non_greedy 버전을 사용해야 합니다. non_greedy 버전은 명시된 타겟을 모두 얻을 수 있을 때에만 실제 획득을 시도합니다. gready 버전을 사용하면 전술한 것처럼 데드락이 발생할 수 있습니다.

   73     std::vector<Chopstick*> PickupChopsticks()

   74     {

   75         //join 생성

   76         Concurrency::join<Chopstick*,Concurrency::non_greedy> j(2);

   77         m_LeftChopstickProvider->link_target(&j);

   78         m_RightChopstickProvider->link_target(&j);

   79 

   80         //젓가락 한쌍을 집습니다.

   81         return Concurrency::receive (j);

   82     } 


젓가락을 내려놓은 것은 간단합니다. 비동기 메시지 전송 함수인 Concurrency::asend()를 사용하여 젓가락이 이용가능함을 알리면 끝입니다.

   83     void PutDownChopsticks(std::vector<Chopstick*>& v)

   84     {

   85         Concurrency::asend(m_LeftChopstickProvider,v[0]);

   86         Concurrency::asend(m_RightChopstickProvider,v[1]);

   87     }


마지막으로 철학자들과 젓가락, 젓가락제공자를 가지고 이들 모두를 셋업하는 역할을 하는 Table 클래스입니다. 주석을 참고하시면 쉽게 이해하실 수 있을 겁니다.

  100 template<class PhilosopherList>

  101 class Table

  102 {

  103     PhilosopherList & m_Philosophers;

  104     std::vector<ChopstickProvider*> m_ChopstickProviders;

  105     std::vector<Chopstick*> m_Chopsticks;

  106 

  107     //이 생성자는 Table 클래서에서 유일한 public 메소드로 vector 변수들을 초기화하고 각 철학자에게 젓가락제공자를 할당합니다:

  108 public:

  109     Table(PhilosopherList& philosophers): m_Philosophers(philosophers)

  110     {

  111         //젓가락 및 젓가락제공자 vector를 채웁니다

  112         for(size_t i = 0; i < m_Philosophers.size();++i)

  113         {

  114             m_ChopstickProviders.push_back(new ChopstickProvider());

  115             m_Chopsticks.push_back(new Chopstick("chopstick"));

  116             //젓가락제공자에 젓가락을 놓습니다

  117             send(m_ChopstickProviders[i],m_Chopsticks[i]);

  118         }

  119         //철학자들을 식탁 자리에 앉힙니다

  120         for(size_t leftIndex = 0; leftIndex < m_Philosophers.size();++leftIndex)

  121         {

  122             //rightIndex 계산

  123             size_t rightIndex = (leftIndex+1)% m_Philosophers.size();

  124 

  125             //왼쪽,오른쪽 제공자를 해당 철학자에 부여합니다

  126             Concurrency::asend(& m_Philosophers[leftIndex].LeftChopstickProviderBuffer,

  127                 m_ChopstickProviders[leftIndex]);

  128             Concurrency::asend(& m_Philosophers[leftIndex].RightChopstickProviderBuffer,

  129                 m_ChopstickProviders[rightIndex]);

  130         }

  131     }

  132     ~Table(){

  133         m_ChopstickProviders.clear();

  134         m_Chopsticks.clear();

  135     }

  136 

  137 };


드디어 대망의 main() 함수입니다. 상태표시를 위한 call 블록과 C++0x 람다의 사용 이외에는, 전술할 클래스들을 사용하고 있을 뿐입니다.

  206 int main()

  207 {

  208     //tr1 array를 사용해 철학자들을 생성합니다

  209     std::tr1::array<Philosopher,5> philosophers = {"Socrates", "Descartes", "Nietzche", "Sartre", "Amdahl"};

  210     Table<std::tr1::array<Philosopher,5>> Table(philosophers);

  211     //상태표시에 이용할 call 블록들의 목록을 생성합니다

  212     std::vector<Concurrency::call<PhilosopherState>*> displays;

  213     //철학자 에이전트를 구동하고 상태표시 항목을 생성합니다

  214     std::for_each(philosophers.begin(),philosophers.end(),[&displays](Philosopher& cur)

  215     {

  216         //상태표시용 call 블록을 하나 만듭니다

  217         Concurrency::call<PhilosopherState>* consoleDisplayBlock = new Concurrency::call<PhilosopherState>([&](PhilosopherState in){

  218             //cout은 각 출력 사이의 스레드안정성을 보장하지 않습니다

  219             if(in == Eating)

  220                 std::cout << cur.m_Name << " is eating\n";

  221             else

  222                 std::cout << cur.m_Name << " is  thinking\n";

  223         });

  224         //상태표시 블록을 연결하고 벡터에 저장해둡니다

  225         cur.CurrentState.link_target(consoleDisplayBlock);

  226         displays.push_back(consoleDisplayBlock);

  227         //그리고 에이전트를 구동합니다

  228         cur.start();

  229     });

  230     //모두 완료되기를 대기

  231     std::for_each(philosophers.begin(),philosophers.end(),[](Philosopher& cur)

  232     {

  233         cur.wait(&cur);

  234     });

  235 

  236     displays.clear();

  237 };


이상을 실행하면 다음과 유사한 결과를 확인하실 수 있습니다.


주석에도 나와있듯이 스레드에 안전하지 않은 cout 출력으로 가끔 상태 메시지가 섞여였음을 확인할 수 있습니다. 그것 이외에는 철학자들이 사이좋게 식사를 하고 있음을 알 수 있습니다.

이렇듯 AAL을 사용하면 저수준의 스레드 함수나 동기화 개체들을 직접 다루지 않고도 쉽게 병렬 수행 작업을 작성할 수 있습니다. 병렬화에 고민하지 않고, 해당 응용프로그램의 도메인 문제에만 집중할 수 있는 것이죠.


이상입니다. 이제 새로운 로고와 함께 VS2010의 베타2도 나왔으니, 새로운 주제로 다시 찾아뵙지요. ^^

Parallel Patterns Library(PPL) - parallel_invoke

VC++ 10 Concurrency Runtime 2009. 10. 20. 08:30 Posted by 알 수 없는 사용자

parallel_invoke는 일련의 태스크를 병렬로 실행할 때 사용합니다. 그리고 모든 태스크가 끝날 때까지 대기합니다. 이 알고리즘은 복수의 독립된 태스크를 실행할 때 유용합니다.

 

일련의 태스크를 병렬로 실행할 때 사용이라는 것을 들었을 때 생각나는 것이 없는가요? 지금까지 제가 올렸던 글을 보셨던 분이라면 parallel task라는 말이 나와야 합니다. ^^

parallel_invoke parallel task와 비슷합니다.

 

 

parallel_invoke parallel task의 다른 점

복수 개의 태스크를 병렬로 실행한다는 것은 둘 다 같지만 아래와 같은 차이점이 있습니다.


 

parallel_invoke

parallel task

편이성

작업 함수만 정의하면 된다.

작업 함수를 만든 후 task handle로 관리해야 한다.

태스크 개수

10개 이하만 가능

제한 없음

모든 태스크의 종료 시 대기

무조건 모든 태스크가 끝날 때까지 대기

Wait를 사용하지 않으면 대기 하지 않는다.



parallel_invoke를 사용할 때

병렬로 실행할 태스크의 개수가 10개 이하이고, 모든 태스크가 종료 할 때까지 대기해도 상관 없는 경우에는 간단하게 사용할 수 있는 parallel_invoke를 사용하는 것이 좋습니다. 하지만 반대로 병렬로 실행할 태스크가 10개를 넘고 모든 태스크의 종료를 대기하지 않아야 할 때는 parallel task를 사용해야 합니다.

 

 

parallel_invoke 사용 방법

parallel_invoke는 병렬로 태스크를 두 개만 실행하는 것에서 10개까지 실행하는 9개의 버전이 있으며 파라미터를 두 개만 사용하는 것에서 10개의 파라미터를 사용하는 것으로 오버로드 되어 있습니다.

각 오버로드된 버전의 파라미터에는 태스크를 정의한 작업 함수를 넘겨야 합니다.

 

 

parallel_invoke 사용 예

아래 예제는 아주 간단한 것으로 게임 프로그램이 처음 실행할 때 각종 파일을 로딩하는 것을 아주 간략화 하여 parallel_invoke를 사용한 예입니다.

 

#include <iostream>

#include <ctime>

#include <windows.h>

#include <concrt.h>

#include <concrtrm.h>

using namespace std;

 

#include <ppl.h>

using namespace Concurrency;

 

// UI 이미지 로딩

void LoadUIImage()

{

           Sleep(1000);

           cout << "Load Complete UI Image" << endl;

}

 

// 텍스쳐 로딩

void LoadTexture()

{

           Sleep(1000);

           cout << "Load Complete Texture" << endl;

}

 

// 폰트 파일 로딩

void LoadFont()

{

           Sleep(1000);

           cout << "Load Complete Font" << endl;

}

 

int main()

{

           parallel_invoke( [] { LoadUIImage(); },

                      [] { LoadTexture(); },

                      [] { LoadFont(); }

                    );

          

           getchar();

           return 0;

}

 

< 실행 결과 >



위 예제를 parallel_invoke를 사용하지 않고 전통적인 방법으로 순서대로 실행했다면 각 작업 함수에서 1초씩 소비하므로 3초가 걸리지만 parallel_invoke를 사용하여 1초만에 끝납니다.

 

그리고 이전에 parallel_for에서도 이야기 했듯이 병렬로 실행할 때는 순서가 지켜지지 않는다는 것을 꼭 유의하시기 바랍니다. 위의 예의 경우도 LoadUIImage()을 첫 번째 파라미터로 넘겼지만 실행 결과를 보면 LoadFont()가 먼저 완료 되었습니다.

 


마지막으로 위의 예제코드에서 parallel_invoke와 관계 있는 부분만 추려볼 테니 확실하게 사용 방법을 외우시기를 바랍니다.^^

 

#include <ppl.h>

using namespace Concurrency;

 

// 태스크 정의

void LoadUIImage()

{

............

}

 

void LoadTexture()

{

............

}

 

void LoadFont()

{

............

}

 

int main()

{

        parallel_invoke( [] { LoadUIImage(); },

                                 [] { LoadTexture(); },

                                 [] { LoadFont(); }

                          );

 

}


이 글은 MSDN 글, "Solving The Dining Philosophers Problem With Asynchronous Agents"를 참고하여 작성되었습니다.

Asynchronous Agents Library로 Dining Philosophers 문제 해결하기 - 1

자, 이제 본격적으로 코드를 살펴보기 전에 메시지 블록이 무엇인지 먼저 짚고 넘어가겠습니다. AAL액터모형을 사용한다고 말씀드렸습니다. 또한, 액터모형에서 액터들은 메시지만으로 통신한다고 말씀드렸죠. 이 때 메시지를 받는 대상 혹은 메시지의 출처의 역할을 하는 것이 메시지 블록입니다. 전자의 경우 목적(target) 블록이라 하고, 후자는 원천(source) 블록이 됩니다.

전회에서 이번 예제에 쓰이는 네가지 메시지 블록을 소개했었는데요. unbounded_buffer는 목적 및 원천으로 쓰이며 큐와 같이 여럿의 메시지를 담고 있을 수 있는 놈입니다. overwrite_buffer는 하나의 변수처럼 값 하나만을 지니며, 새로 메시지가 올 경우 기존 값은 덮어씌여집니다. 역시 원천으로도 쓰일 수 있으며, 이 경우 사본을 보냅니다. 반면, call목적 블록으로만 쓰여 메시지 도착 시 특정 함수개체를 불러주는 기능을 합니다. join은 이번 예제에서 핵심 역할을 하는 블록으로서 여러 메시지를 동시에 기다려 하나로 묶어 출력하는 기능을 합니다.

먼저 가장 간단한 Chopstick 클래스를 살펴보죠.

   22 class Chopstick{

   23     const std::string m_Id;

   24 public:

   25     Chopstick(std::string && Id):m_Id(Id){};

   26     const std::string GetID()

   27     {

   28         return m_Id;

   29     };

   30 };


이와 같이 젓가락 식별용의 문자열을 가질뿐입니다. 생성자에서 r-value 참조를 쓰고 있다는 것 정도가 주목할만한 사항이겠군요.

다음은 ChopstickProvider로 다음과 같이 단순히 typedef입니다.

   34 typedef Concurrency::unbounded_buffer<Chopstick*> ChopstickProvider;


unbounded_buffer 메시지 블록을 이용해 메시지로 젓가락을 받으면 담고 있다가 철학자의 요청이 있으면 제공하는 역할을 합니다. 물록 철학자가 한입 먹고 나선 다시 젓가락을 놓으면 다시 받아놓는 역할도 합니다. 이 예제에서는 unbounded_buffer의 개수무제한(unbounded) 특성이 사실 굳이 필요 없습니다만 그래도 unbounded_buffer의 move semantic이 필요하기에(이 점에서 사본을 보내는 overwrite_buffer와는 다르죠) 이를 쓰는 것입니다.

다음이 대망의 Philosopher 클래스가 되겠습니다. 먼저, Concurrency::agent에서 public 상속을 받고 있는 것을 확인할 수 있습니다. 말씀드린 것처럼 각 철학자가 액터가 되어 독립적으로 동작하기 (즉, 별도 스레드로) 위함입니다.

   35 class Philosopher : public Concurrency::agent

   36 {

   37     ChopstickProvider* m_LeftChopstickProvider;

   38     ChopstickProvider* m_RightChopstickProvider;

   39 

   40 public:

   41     const std::string m_Name;

   42     const size_t  m_Bites;

   43     Philosopher(const std::string&& name, size_t bites=10):m_Name(name),m_Bites(bites){};

   44     Concurrency::unbounded_buffer<ChopstickProvider*> LeftChopstickProviderBuffer;

   45     Concurrency::unbounded_buffer<ChopstickProvider*> RightChopstickProviderBuffer;

   46     Concurrency::overwrite_buffer<PhilosopherState> CurrentState;

   47     void run()

   48     {

   49 

   50         //run에서 제일 먼저 해야하는 것은 ChopstickProvider를 초기화하는 것입니다. 여기서는 receive를 통해 public 변수에 메시지가 도착하기를 기다리게 하는 방식으로 처리합니다:

   51 

   52         //ChopstickProvider들을 초기화합니다.

   53         m_LeftChopstickProvider  = Concurrency::receive(LeftChopstickProviderBuffer);

   54         m_RightChopstickProvider = Concurrency::receive(RightChopstickProviderBuffer);

   55 

   56         //이제 생각하다가 먹기를 반복해야 합니다. 그를 위해 아직 등장하지 않은 두 함수(PickupChopsticks과 PutDownChopsticks)를 이용하려고 합니다:

   57 

   58         for(size_t i = 0; i < m_Bites;++i)

   59         {

   60             Think();

   61             std::vector<Chopstick*> chopsticks(PickupChopsticks());

   62             Eat();

   63             PutDownChopsticks(chopsticks);

   64         }

   65 

   66         //남은 일은 run 메소드를 나가기 전에 정리 작업을 하는 것인데, 다른 곳에 쓰일 수 있도록 ChopstickProvider를 반환하고 에이전트의 상태를 완료로 설정하고 있습니다.

   67         Concurrency::send(LeftChopstickProviderBufferm_LeftChopstickProvider);

   68         Concurrency::send(RightChopstickProviderBuffer, m_RightChopstickProvider);

   69 

   70         this->done(Concurrency::agent_done);

   71     }

   72 

   73     std::vector<Chopstick*> PickupChopsticks()

   74     {

   75         //join 생성

   76         Concurrency::join<Chopstick*,Concurrency::non_greedy> j(2);

   77         m_LeftChopstickProvider->link_target(&j);

   78         m_RightChopstickProvider->link_target(&j);

   79 

   80         //젓가락을 듭니다.

   81         return Concurrency::receive (j);

   82     } 

   83     void PutDownChopsticks(std::vector<Chopstick*>& v)

   84     {

   85         Concurrency::asend(m_LeftChopstickProvider,v[0]);

   86         Concurrency::asend(m_RightChopstickProvider,v[1]);

   87     }

   88 private:

   89     void Eat()

   90     {

   91         send(&CurrentState,Eating);

   92         RandomSpin();

   93     };

   94     void Think()

   95     {

   96         send(&CurrentState,Thinking);

   97         RandomSpin();

   98     };

   99 };


그 다음으로 한쌍의 젓가락을 위한 두 ChopstickProvider 포인터 변수(m_LeftChopstickProvider, m_RightChopstickProvider)가 보입니다. 철학자 이름(m_Name)과 몇번 먹을지를 나타내는 변수(m_Bites), 생성자까지는 파악하시는데 어려움이 없을 겁니다.

ChopstickProvider (이 자체도 unbounded_buffer인데) 포인터를 템플릿 인자로 가지는 unbounded_buffer 변수 한쌍이 등장하는데요. (44,45줄) 철학자가 젓가락을 소유하고 있는 상황이 아니고 철학자와는 별개로 젓가락들이 존재하는 상황이기에 필요한 변수들입니다. 이 두 public 변수들을 통해, 나중에 철학자들에게 필요할 때 젓가락을 제공해주는 ChopstickProvider를, 어딘가에서 받을 수 있습니다. 이들을 갖추고 나면 그 후부터 생각하다가 먹다가 할 수 있겠죠.

그 뒤로 run 메소드가 나옵니다. 실제 액터가 구동되면 수행될 함수입니다. 먼저, 전술한 두 변수를 통해 ChopstickProvider가 제공되기를 기다립니다. 이 때 Concurrency::receive 함수를 쓰고 있습니다. (이의 비동기 버전인 Concurrency::try_receive도 있습니다.)

58줄부터는 생각하다 먹기를 반복하는 반복문이 나옵니다. ThinkEat 함수는 89줄 이하에서 확인할 수 있는 것처럼 철학자의 현재 상태를 나타내는 overwrite_buffer 형의 변수 CurrentState를 설정하는 것 이외에는 특별히 하는 일이 없습니다. 그냥 시간을 좀 지체할 뿐입니다.

그리고 이 두 함수 호출 사이에 PickupChopsticksPutDownChopsticks 함수를 써서 실제 가장 중요한 젓가락 한 쌍을 안전하게 획득하고 다시 내려놓는 일을 합니다.


이에 대한 설명은 다음 회를 기대해주세요~ ^^

C++ STL을 알고 있는 분들은 ‘parallel_for_each’에서 ‘parallel_’만 빼면 남는 ‘for_each’는 본적이 있고 사용해본 경험도 있을 것입니다.

 

parallel_for가 for문을 병렬화 한 알고리즘이라면 parallel_for_each는 STL의 for_each 알고리즘을 병렬화 한 것입니다.

 

STL 컨테이너에 있는 데이터를 처리할 때 for_each를 사용한 것을 쉽게 parallel_for_each로 바꾸어 아주 손 쉽게 병렬화 할 수 있습니다.

 

 

parallel_for_each의 원형

 

template < typename _Input_iterator, typename _Function >

_Function parallel_for_each( _Input_iterator _First,  _Input_iterator _Last,   _Function _Func );

 

_First : 시작 위치

_Last : 마지막 위치

_Func : 병렬 처리에 사용할 함수(함수 객체, 함수, 람다 식)

 

for_each에 대해서 알고 있는 분들은 앞서 소개한 parallel_for 보다 더 쉽다고 느낄 것입니다. 기존의 for_each가 사용하는 파라미터도 같습니다. 기존에 사용했던 for_each parallel_for_each로 바꿀려면 알고리즘 이름만 바꾸어도 됩니다.

 

 

 

초 간단 parallel_for_each 사용 방법

 

1. 필요한 헤더 파일 포함

#include <ppl.h>


2.네임 스페이스 선언

using namespace Concurrency;

 

3. parallel_for_each에서 사용할 함수 정의

 

4. parallel_for_each에서 사용할 STL 컨테이너 정의

 

5. parallel_for_each 사용

 

 

 

parallel_for_each를 사용하는 간단한 예제


#include <iostream>

#include <algorithm>

#include <vector>

using namespace std;

 

#include <ppl.h>

using namespace Concurrency;

 

int main()

{

     vector< int > ItemCdList(10);

     generate( ItemCdList.begin(), ItemCdList.end(), []() -> int {

                                       int n = rand();

                                       return n; }

              );

 

      cout << "for_each" << endl;

      for_each( ItemCdList.begin(), ItemCdList.end(), [] (int n) {

                            cout << "<" << n << ">"; } );

      cout << endl << endl;

 

      cout << "parallel_for_each - 1" << endl;

      parallel_for_each( ItemCdList.begin(), ItemCdList.end(), [] (int n) {

                                    cout << "<" << n << ">"; }

                        );

      cout << endl << endl;

 

      cout << "parallel_for_each - 2" << endl;

      critical_section rt;

      parallel_for_each( ItemCdList.begin(), ItemCdList.end(), [&] (int n) {

                               rt.lock();

                              cout << "<" << n << ">";

                               rt.unlock(); }

                       );

 

      getchar();

      return 0;

}

 


위 예제는 vecter 컨테이너에 램덤으로 10개의 숫자 값을 채운 후 출력하는 것입니다.


for_each paralle_for_each 사용 방법이 이름만 다를 뿐 똑 같습니다.




위 예제를 초 간단 parallel_for_each 사용 방법의 순서에 비추어 보면 아래 그림과 같습니다.

 

 

위 예제의 결과입니다.

 



공유 자원 동기화 문제


parallel_for 때도 잠시 언급했듯이 parallel_for_each는 순서대로 실행하지 않고 병렬로 실행하므로 for_each를 사용한 것과 비교해 보면 출력 순서가 서로 다릅니다.

그리고 특히 문제가 되는 것이 공유 자원을 사용할 때 따로 동기화 시키지 않으면 원하지 않는 결과가 나옵니다.

 



위와 같은 잘못된 결과는 나올 수도 있고 안 나올 수도 있습니다. 즉 타이밍에 의해서 발생하는 것이기 때문입니다. 이것이 병렬 프로그래밍의 어려움 중의 하나인데 에러가 언제나 발생하면 빨리 발견하여 처리할 수 있는데 공유 자원을 동기화 하지 않았을 때 발생하는 문제는 바로 발생할 수도 있고 때로는 여러 번 실행했을 때 간혹 나올 때도 있어서 버그 찾기에 어려움이 있습니다.

 

공유 자원의 동기화가 깨어지는 것을 막기 위해서는 동기화 객체를 사용하면 됩니다. 위 예제에서 두 번째 사용한 parallel_for_each‘critical_section’이라는 동기화 객체를 사용하여 공유 자원을 안전하게 보호하고 있어서 올바르게 값을 출력하고 있습니다.

‘critical_section’에 대해서는 다음 기회에 자세하게 설명하겠습니다.

 

parallel_for_each에 대해서는 이것으로 마무리하고 다음 번에는 parallel_invoke에 대해서 설명하겠습니다.

 

이 글은 MSDN 글, "Solving The Dining Philosophers Problem With Asynchronous Agents"를 참고하여 작성되었습니다.

오늘은 AAL(Asynchronous Agents Library)의 액터기반프로그래밍을 사용하여, 동기화 개체들로는 해법이 상당히 골치아프기로 유명한 "철학자들의 식사(Dining Philosophers) 문제"를 풀어보겠습니다. 내용이 길어질듯 하여 3회의 연재글로 구성하려 합니다.

먼저 간단히 철학자들의 식사 문제를 소개하면,


간단히 위 그림과 같은 상황입니다. 철학자 다섯명이 식사를 하는데 젓가락(그림에는 포크지만 상관없습니다;)이 보시는바와 같이 역시 다섯개뿐입니다. 그들은 철학자답게 생각하다가 한입 먹다가를 반복합니다. 한입 먹으려면 젓가락 한쌍이 필요해서 옆사람이 사이에 놓인 젓가락을 이미 선점해 먹고 있다면 기다려야 하는 것이죠. 공유 상태를 고려하지 않고 구현하면 데드락 등으로 철학자가 굶는(starvation) 상황이 발생할 수 있습니다. 이 문제는 저명한 컴퓨터과학자 다익스트라가 처음 제시하였습니다. 모니터 등의 동기화 개체를 사용하여 해결하는 방법이 기존에 많이 설명되어 있습니다만... 솔직히 이해하기가 쉽지 않고 구현도 어렵습니다.

이때 AAL이 제공하는 액터모형을 이용하면 그러한 난해함이나 복잡함 없이 이 문제를 해결할 수 있습니다. 액터모형은 독립적으로 동작하며 서로간에는 오로지 메시지만으로 소통하는(즉, 공유 상태를 가지지 않는) 액터들로 시스템을 모델링하는 방법이라 하겠습니다.

본 예제에서는 철학자를 액터(AAL 용어로는 에이전트)로 보고 메시지 전달을 위해 AAL에서 제공하는 몇몇 메시지 블록(message block)들을 사용하여 철학자들의 식사 문제를 해결합니다.

다음과 같은 다섯 클래스들을 작성하게 됩니다.

  • Chopstick 클래스
  • 식탁 위의 젓가락을 실제 소유하며 요청에 따라 철학자에게 제공하는 역할을 하는 ChopstickProvider 
  • 생각하고 먹는 에이전트 역할의 Philosopher 클래스. 이 클래스는 한쌍의 ChopstickProvider와만 소통합니다.
  • 생각하고 먹는 상태를 나타내는 PhilosopherState 열거형
  • 젓가락들과 철학자들이 배치될 Table 클래스

이 과정에서 다음과 같은 AAL의 클래스 및 함수들을 이용합니다.
  • Concurrency::agent - 에이전트 기반 클래스
  • 이하는 메시지 블록에 속하는 여러 타입들
    • Concurrency::unbounded_buffer
    • Concurrency::overwrite_buffer
    • Concurrency::join
    • Concurrency::call
  • 이상의 메시지 블록들에 메시지를 주고 받는데 사용하는 함수들
    • Concurrency::send
    • Concurrency::asend - 위의 비동기 버전으로, 받음 여부를 확인하지 않고 바로 리턴
    • Concurrency::receive

본격적인 구현 과정은 다음 회에 계속됩니다~ ^^

원래 저번 주에 글을 올릴 예정이었으나 근래에 제 몸 상태와 집 PC 상태가 메롱이 되어버려 한 주 늦게 글을 올립니다(혹시 기다리고 계시는 분이 있었는지 모르겠네요 ^^;;; )



for 문의 병렬화 

이번에는 PPL의 세 개의 알고리즘 중 parallel_for 알고리즘에 대해서 이야기 하겠습니다.

앞 글에서 간단하게 설명했듯이 parallel_for는 그 이름을 보면 유추 할 수 있듯이 for 문을 병렬화 한 알고리즘입니다.

 

아주 많은 횟수로 반복 작업을 해야할 때 하나의 스레드로 처리하는 것보다는 여러 스레드로 동시에 처리하면 훨씬 빨라지는 것은 당연하겠죠? 바로 이 때 사용하면 좋습니다.

하지만 parallel_for 알고리즘은 아무 곳에나 사용할 수는 없습니다. 루프의 반복 계산 사이에 리소스를 공유하지는 않으면서 루프의 본체가 있는 경우 사용하면 편리합니다.

( 앞의 계산 결과를 다음 계산에서 사용해야 된다면 병렬로 실행하기 힘듭니다 )

 

 

parallel-for의 원형

 

두 개의 오버로드 버전이 있습니다.

 

template < typename _Index_type, typename _Function >

_Function parallel_for( _Index_type _First,  _Index_type _Last, _Function _Func );

_Index_type _First : 시작 위치

_Index_type _Last : 마지막 위치

_Function _Func : 병렬 처리로 사용할 함수

 

 

template < typename _Index_type, typename _Function >

_Function parallel_for( _Index_type _First, _Index_type _Last, _Index_type _Step, _Function _Func );

_Index_type _First : 시작 위치

_Index_type _Last : 마지막 위치

_Index_type _Step : 증분 값

_Function _Func : 병렬 처리로 사용할 함수

 

파라미터 값을 보면 for에서 사용하는 것과 비슷하다는 것을 알 수 있을겁니다. 차이점은 첫 번째 버전의 경우 증분 값으로 1이 자동으로 사용된다는 것과 마지막 파리미터로 병렬 처리에 사용할 함수를 사용한다는 것입니다.

 

 

for와 비슷하므로 for를 사용하는 대 부분을 prarallel_for로 변경할 수 있습니다. 다만 parallel_for 알고리즘에서는 반복 변수의 현재 값이 _Last 보다 작으면 중단합니다 ( 보통 for 문과 다르게 ‘<’ 조건만 사용합니다 ).

또 _Index_type 입력 파라미터는 정수형이어야만 합니다.

parallel_for 파라미터가 1보다 작은 경우 invalid_argument_Step 예외를 던집니다.

 


 

초 간단 parallel_for 사용 방법

 

1. 필요한 헤더 파일 포함
  #include <ppl.h>


2.
네임 스페이스 선언

  using namespace Concurrency;

 

3. parallel_for에서 호출할 작업 함수 정의

 

4. parallel_for에서 사용할 data set 정의

 

5. parallel_for 사용

 

 

 그럼 아주 간단한 실제 사용 예제 코드를 볼까요?

 

#include <ppl.h>

#include <iostream>

 

using namespace Concurrency;

using namespace std;

 

 

int main()

{

    int CallNum = 0;

    int Numbers[50] = { 0, };


   
parallel_for( 0, 50-1, [&](
int n ) {

        ++CallNum;

        Numbers[n] += CallNum;

       }               

      );

 

    for( int i = 0; i < 50; ++i )

    {

        cout << i << " : " << Numbers[i] << endl;

    }

 

    getchar();

    return 0;

}


 

위 예제는 Numbers라는 int 형 배열의 각 요소에 CallNum 이라는 변수를 더하는 것입니다. 간단하고 확실하게 parallel_for 사용 방법을 보이기 위해 허접한 예제를 만들게 되었음을 양해 바랍니다.^^;;; ( 다음에 기회가 되면 좀 더 멋지고 실용적인 예제를 보여드리도록 하겠습니다 )

예제에서는 코드를 간략화 하기 위해서 parallel_for의 마지막 파리미터로 람다 식을 사용했습니다.

위 예제를 '초 간단 parallel_for 사용 방법'의 순서에 비추어보면 아래 그림과 같습니다.

 

 


예제를 실행하면 아래와 같은 결과가 나옵니다.

 

(길어서 일부만 캡쳐 했습니다)

 

실행 결과를 보면 Numbers 배열의 각 요소의 값이 순서대로 증가되지 않았다라는 것을 알 수 있습니다. 만약 보통의 for 문이라면 Numbers[0] 1, Numbers[1] 2 라는 값으로 됩니다. 그러나 parallel_for는 병렬적으로 실행되므로 순서가 지켜지지 않습니다. CallNum 라는 변수는 parallel_for의 모든 스레드에서 접근하는 공유 변수이므로 동기화 되지 않았다라는 것도 유의해야 합니다.

 

Parallel_for를 사용할 때 순서대로 실행하지 않고, 공유 변수는 동기화 되지 않음을 잊지마시기를 바랍니다.

 

이것으로 (너무?)간단하게 parallel_for에 대해서 알아 보았습니다. 다음에는 parallel_for_each에 대해서 설명하겠습니다.




수정

1. 덧글의 ivyfore님이 알려주신대로

parallel_for( 0, 50-1, [&]( int n )가 아닌

 parallel_for( 0, 50, [&]( int n ) 가 되어야 합니다.

Parallel Patterns Library(PPL) - 병렬 알고리즘

VC++ 10 Concurrency Runtime 2009. 8. 19. 13:00 Posted by 알 수 없는 사용자

Parallel Patterns Library(이하 PPL)에는 데이터 컬렉션을 대상으로 쉽게 병렬 작업을 할 수 있게 해 주는 알고리즘이 있습니다. 이 알고리즘들은 생소한 것들이 아니고 C++의 표준 템플릿 라이브러리(STL)에서 제공하는 알고리즘과 비슷한 모양과 사용법을 가지고 있습니다.

( *데이터 컬렉션은 데이터 모음으로 배열이나 STL 컨테이너를 생각하면 됩니다 )

 

 

PPL에서 제공하는 병렬 알고리즘은 총 세 개가 있습니다.

 

1. parallel_for        알고리즘

2. parallel_for_each 알고리즘

3. parallel_invoke    알고리즘

 

 

세 개의 알고리즘 중 3 parallel_invoke만 생소하지 1번과 2번은 앞의 ‘parallel_’이라는 글자만 빼면 ‘for’‘for_each’ C++로 프로그래밍할 때 자주 사용하는 것이므로 친숙하게 느껴질 겁니다.

실제 병렬 여부만 제외하면 우리가 알고 있는 것들과 비슷한 동작을 합니다. 그래서 쉽게 배울 수 있고 기존의 코드에 적용하기도 쉽습니다.

 


parallel_for 알고리즘은 일반적인 for문을 사용할 때와 비슷하게 데이터 컬렉션에서 시작할 위치와 마지막 위치, 증가분(생략 가능합니다)에 해야할 작업 함수를 파라미터로 넘기면 됩니다. 사용 방법에서 for문과 다른 점은 작업 함수를 넘긴다는 점입니다.

 

parallel_for_each 알고리즘은 기존 for_each와 거의 같습니다. 데이터 컬렉션에서 시작할 위치, 마지막 위치, 작업 함수를 파라미터로 넘기면 됩니다. parallel_for의 경우 기존의 for문을 사용할 때는 작업 함수를 파라미터로 넘기지 않기 때문에 기존 for 문에 비해서 구조가 달라지지만 parallel_for_each는 기존 for_each와 파라미터 사용 방법이 같기 때문에 알고리즘의 이름만 바꾸면 될 정도입니다.

 

parallel_invoke 알고리즘 이전 회에 설명한 태스크 그룹과 비슷한면이 있습니다. 태스크 그룹과의 큰 차이점은 병렬로 할수 있는 작업은 10개로 제한 되지만 사용 방법은 태스크 그룹보다 더 간결한 점입니다다. 병렬 작업의 개수가 10개 이하인 경우 태스크 그룹보다 parallel_invoke를 사용하는 것이 훨씬 더 적합하다고 생각합니다.

 

 

 

 

이번은 간단하게 PPL에 있는 세 가지 병렬 알고리즘을 소개하는 것으로 마칩니다. 다음 회부터는 이번에 소개했던 세 개의 알고리즘을 하나씩 하나씩 자세하게 설명하겠습니다.

Parallel Patterns Library(PPL) - Task

VC++ 10 Concurrency Runtime 2009. 8. 18. 00:27 Posted by 알 수 없는 사용자
이번 글은 길이가 좀 깁니다. 내용은 복잡한 것이 아니니 길다고 중간에 포기하지 마시고 쭉 읽어주세요^^


이전 회에서는 PPL에 대한 개념을 간단하게 설명했고, 이번에는 PPL의 세가지 feature 중 태스크(Task)에 대해서 설명하려고 합니다. 태스크에 대한 설명은 이미 이전에 정재원님께서 블로그를 통해서 설명한 적이 있습니다. 정재원님의 글은 태스크 사용 예제 코드를 중심으로 설명한 것으로 저는 그 글에서 빠진 부분과 기초적인 부분을 좀 더 설명하려고 합니다.

 

태스크라는 것은 작업 단위라고 생각하면 좋을 것 같습니다. 작업이라는 것은 여러 가지가 될 수 있습니다. 피보나치 수 계산, 배열에 있는 숫자 더하기, 그림 파일 크기 변경 등 작고 큰 작업이 있습니다. 보통 크기가 큰 작업은 이것을 작은 작업 단위로 나누어 병렬 처리를 하기도 합니다.

 

PPL의 태스크는 작업을 그룹 단위로 묶어서 병렬로 처리하고 대기 및 취소를 할 수 있습니다.

 

 


태스크 핸들

태스크 핸들은 각각의 태스크 항목을 가리키며 PPL에서는 task_handle 클래스를 사용합니다. 이 클래스는 람다 함수 또는 함수 오브젝트 등을 태스크를 실행하는 코드로 캡슐화 합니다. 태스크 핸들은 캡슐화 된 태스크 함수의 유효 기간을 관리하기 때문에 중요합니다. 예를들면 태스크 그룹에 태스크 핸들을 넘길 때는 태스크 그룹이 완료 될때까지 유효해야합니다.


보통 태스크 관련 예제 코드를 보면 task_handle 대신 C++0x의 auto를 사용하는 편이 코드가 더 간결해지므로 task_handle 보다는 auto를 사용하고 있습니다.


 

 

unstructured structured Task Groups

태스크 그룹은 unstructured structured 두 개로 나누어집니다.

두개의 태스크 그룹의 차이는 스레드 세이프하냐 안하느냐의 차이입니다.

unstructured는 스레드 세이프 하고 structured는 스레드 세이프 하지 않습니다.


태스크 관련 예제에 자주 나오는 task_group 클래스는 unstructured 태스크 그룹이고, structured_task_group 클래스는 structured 태스크 그룹을 뜻합니다.

 

unstructured 태스크 그룹은 structured 태스크 그룹보다 유연합니다. 스레드 세이프 하며 작업 중 taks_group::wait를 호출하여 대기한 후 태스크를 추가한 후 실행할 수 있습니다. 그렇지만 성능면에서 structured 태스크 그룹이 스레드 세이프 하지 않으므로 unstructured 태스크 그룹보다 훨씬 더 좋으므로 적절하게 선택해서 사용해야 합니다.

 

structured 작업 그룹은 스레드 세이프 하지 않기 때문에 Concurrency Runtime에서는 몇가지 제한이 있습니다.

- structured 작업 그룹 안에 다른 structured 작업 그룹이 있을 경우 내부의 작업 그룹은 외부의 작업 그룹보다 먼저 완료해야 한다.

- structured_task_group::wait 멤버를 호출한 후에는 다른 작업을 추가한 후 실행할 수 없다.


 

 

초간단!!! 6단계로 끝내는 태스크 사용 방법


1. ppl.h 파일을 포함합니다.

   #include <ppl.h>

 

2. Concurrency Runtime의 네임 스페이를 선언합니다.

   using namespace Concurrency;

 

3. 태스크 그룹을 정의합니다.

  structured_task_group structured_tasks;

 

4. 태스크를 정의합니다.

  auto structured_task1 = make_task([&] { Plus(arraynum1, true); } );

 

5. 태스크를 태스크 그룹에 추가한 후 실행합니다.

  structured_tasks.run( structured_task1 );

 

6. 태스크 그룹에 있는 태스크가 완료될 때까지 기다립니다.

  structured_tasks.wait();

 

위의 순서대로 하면 태스크를 사용할 수 있습니다. 태스크 사용 참 쉽죠잉~ ^^.

참고로 여러 개의 태스크를 그룹에 추가하고 싶다면 6번 이전에 4번과 5번을 추가할 개수만큼 반복하면 됩니다.


* 4번의 Plus(arraynum1, true);는 하나의 태스크에서 실행할 함수입니다.

 


PPL의 태스크를 사용하면 병렬 프로그래밍을 간단한 6단계만으로 끝낼 수 있습니다. 만약 현재의 Win32 API로 이것을 구현하기 위해서는 학습에 많은 시간을 보낸 후 저수준의 API를 사용하여 구현해야 되기 때문에 구현 시간과 안정성에서 PPL의 태스크보다 손해를 봅니다.




태스크 그룹과 스레드 세이프

unstructured structured 태스크 그룹의 차이가 스레드 세이프 유무의 차이라고 했는데 이 말은

unstructured 태스크 그룹은 복수의 스레드에서 호출 및 대기를 할 수 있지만 structured 태스크 그룹은 그것을 생성한 스레드에서만 호출 및 대기를 할 수 있습니다.


예를 들면 스레드 A, 스레드 B가 있는 경우 스레드 A와 B에서 태스크를 실행 후 대기를 한다면 unstructured 태스크 그룹을 사용해야하고, 오직 하나의 스레드에서만(스레드 A에서만) 태스크를 실행 후 대기를 한다면 structured 태스크 그룹을 사용합니다.


스레드 세이프는 스레드 세이프 하지 않는 것보다 오버헤드가 발생합니다. 즉 스레드 세이프 버전은 스레드 세이프 하지 않은 버전보다 성능이 떨어진다는 것이죠.

그러니 태스크 그룹을 어떤 방식으로 사용할지 파악 후 스레드 세이프 필요성에 따라서 unstructured 태스크 그룹과 structured 태스크 그룹 중 상황에 알맞은 것을 선택해서 사용해야 합니다.




ps : 제가 8월 14일 글을 공개할 때 태스크 그룹의 스레드 세이프 특성을 잘 못 이해하여 잘못된 내용을 전달하였습니다. 그래서 오늘 글을 다시 수정하였습니다. ;;;;;;

다음부터는 틀린 글을 올리지 않도록 조심하겠습니다. ^^;;;;;;

양보할 줄 아는 Concurrency Runtime의 event

VC++ 10 Concurrency Runtime 2009. 8. 7. 18:48 Posted by 알 수 없는 사용자
병행 런타임 관련 두번째 예제로 event를 이용합니다. 여기서 이벤트는 뮤텍스 등과 같은 동기화 개체의 하나로 기존 Win32에서의 수동리셋이벤트(manual-reset event)와 같은 것을 말합니다.

기존 이벤트와 병행 런타임에서 제공하는 이벤트에는 한가지 중요한 차이점이 있습니다. 새로운 이벤트는 병행 런타임을 인식하여 작업이 블록되는 경우 다른 작업에 스레드를 양보(yield)합니다. 무조건 선점형으로 동작하는 기존 Win32 이벤트보다 더 지능적이고 효율적으로 동작하는 것이죠.

자, 그럼 코드를 살펴봅시다. 이 예제에서는 스케줄러의 병렬성을 2로 제한하고 그보다 많은 수의 작업을 병행 수행할 때, 기존 Win32 이벤트와 병행 런타임 이벤트를 각각 활용하는 경우 어떤 차이가 있는지 보여줍니다.

    1 // event.cpp : Defines the entry point for the console application.

    2 //

    3 // compile with: /EHsc

    4 #include <windows.h>

    5 #include <concrt.h>

    6 #include <concrtrm.h>

    7 #include <ppl.h>

    8 

    9 using namespace Concurrency;

   10 using namespace std;

   11 

   12 class WindowsEvent

   13 {

   14     HANDLE m_event;

   15 public:

   16     WindowsEvent()

   17         :m_event(CreateEvent(NULL,TRUE,FALSE,TEXT("WindowsEvent")))

   18     {

   19     }

   20 

   21     ~WindowsEvent()

   22     {

   23         CloseHandle(m_event);

   24     }

   25 

   26     void set()

   27     {

   28         SetEvent(m_event);

   29     }

   30 

   31     void wait(int count = INFINITE)

   32     {

   33         WaitForSingleObject(m_event,count);

   34     }

   35 };

   36 

   37 template<class EventClass>

   38 void DemoEvent()

   39 {

   40     EventClass e;

   41     volatile long taskCtr = 0;

   42 

   43     //태스크그룹을 생성하고 여러 태스크 사본을 스케줄링합니다.

   44     task_group tg;

   45     for(int i = 1;i <= 8; ++i)

   46         tg.run([&e,&taskCtr]{           

   47 

   48       //작업 부하를 시뮬레이션합니다.

   49             Sleep(100);

   50 

   51             //태스크 카운터를 증가시킵니다.

   52             long taskId = InterlockedIncrement(&taskCtr);

   53             printf_s("\tTask %d waiting for the event\n", taskId);

   54 

   55             e.wait();

   56 

   57             printf_s("\tTask %d has received the event\n", taskId);

   58 

   59     });

   60 

   61     //이벤트를 셋하기 전에 충분히 시간을 보냅니다.

   62     Sleep(1500);

   63 

   64     printf_s("\n\tSetting the event\n");

   65 

   66     //이벤트를 셋

   67     e.set();

   68 

   69     //작업들의 완료를 대기

   70     tg.wait();

   71 }

   72 

   73 int main ()

   74 {

   75     //스레드 둘만을 활용하는 스케줄러를 생성합니다.

   76     CurrentScheduler::Create(SchedulerPolicy(2, MinConcurrency, 2, MaxConcurrency, 2));

   77 

   78     //협력적 이벤트를 사용할 경우, 모든 작업들이 시작됩니다.

   79     printf_s("Cooperative Event\n");

   80     DemoEvent<event>();

   81 

   82     //기존 이벤트를 사용하면, Win7 x64 환경이 아닌한

   83     //ConcRT가 블록 상황을 인식하지 못하여 첫 두 작업만이 시작됩니다.

   84     printf_s("Windows Event\n");

   85     DemoEvent<WindowsEvent>();

   86 

   87     return 0;

   88 }


WindowsEvent 클래스는 기존 Win32 이벤트를 위한 랩퍼(wrapper) 클래스입니다. DemoEvent 함수 템플릿이 사용할 이벤트 형을 템플릿 인자로 받아 실제 작업을 하는 놈입니다. PPL(Parallel Patterns Library)의 task를 이용해 8개 작업을 만들고 각각에서 이벤트를 기다리도록 하고 있습니다.

결과는 다음과 같이 나올 겁니다.


병렬성이 둘로 제한되는 상황에서도 병행 런타임의 이벤트를 사용할 경우 각 작업이 블록될 경우 다른 작업에 스레드를 양보하기 때문에 8개의 작업이 모두 시작되는 것을 확인하실 수 있습니다. 반면, 기존 이벤트의 경우 두 작업만이 시작되었다가 이벤트를 받고 두 작업이 종료된 후에나 다른 작업들이 시작되는 것을 확인하실 수 있습니다.

Parallel Patterns Library (PPL)

VC++ 10 Concurrency Runtime 2009. 8. 6. 06:00 Posted by 알 수 없는 사용자

이제 본격적으로 VC++ 10의 병렬 프로그래밍에 대한 이야기를 시작합니다.

첫 번째는 이름만 들어도 딱 '병렬 프로그래밍' 이라는느낌을 주고 가장 많이 사용될 것으로 생각하는 Parallel Patterns Library (PPL)입니다정말 이름에서 딱 느낌이 오죠 ^^



PPL은 크게 세 개의 features로 나누어집니다.

1. Task Parallelism : 병렬적으로 여러 가지 작업 처리

2. Parallel algorithms : 데이터 컬렉션을 제너릭 알고리즘으로병렬 처리

3. Parallel containers and objects :concurrent 접근이 가능한 제너릭 컨테이너

 


PPL 모델은 C++의 Standard Template Library(STL)과비슷합니다.

예를 들면 STL에는 for_each 라는 것이 있는데 PPL에는 이것의 병렬 버전인 parallel_for_each가 있습니다. 뒤에 설명하겠지만 parallel_for_each에 대해서 간단하게 말하면 array의 항목을 순회하는 parallel 알고리즘입니다.



PPL을 사용하기 위해서는 먼저 namespace Concurrency를 선언한 후 ppl.h 파일을 포함합니다.
........
#include <ppl.h>

using namespace Concurrency;
..............


먼저 parallel_for_each를 사용한 코드를 보여 드리겠습니다. parallel_for_each는 다음에 자세히 설명하겠으니 이번은 PPL 이라는 것이 어떻게 사용하는지만 아래 코드를 통해서 보세요^^

< 리스트 1. parallel_for_each 예제 >

#include <ppl.h>

#include <array>

#include <algorithm>

 

using namespace std;

using namespace std::tr1;

using namespace Concurrency;

 

int main()

{

   // Create anarray object that contains a few elements.

   array<int, 3> a = {13, 26, 39};

 

   // Use thefor_each algorithm to perform an operation on each element

   // of the arrayserially.

  for_each(a.begin(), a.end(), [&](int n) {

      // TODO:Perform some operation on n.

   });

 

   // Use theparallel_for_each algorithm to perform the same operation

   // in parallel.

  parallel_for_each(a.begin(), a.end(), [&](int n) {

      // TODO:Perform some operation on n.

   });

}


<리스트 1>의 코드를 보면 람다를 사용한 부분도 보이죠? 예전에 제가 C++0x의 새로운 기능에 의해 C++의 성능과 표현력이 향상 되었다고 이야기 했습니다. 이런 장점들이 PPL에 많은 기여를 하였습니다.




PPL과 OpenMP

예전에 PPL이 MSDN 매거진을 통해서 공개 되었을 때 많은 분들이 OpenMP와 비슷하게 보시고 왜 기존에 있는 것과 같은 것을 또 만드냐 라는 이야기를 하는 것을 들은 적이 있습니다.

PPL과 OpenMP는 같은 것이 아닙니다. 표현 방법이 얼핏 비슷하게 보일지 몰라도 개념이나 기반은 많이 다릅니다.

OpenMP는 pragma 지신문이고 PPL은 순수 C++ 템플릿으로 만들어진 라이브러리입니다.
그래서 PPL은 표현성과 유연성이 OpenMP에서 비해서 훨씬 더 뛰어납니다.
또한 PPL은 Concurrency Runtime 기반 위에 구축되므로 동일한 런타임을 기반으로 하는 다른 라이브러리와 잠재적 상호 운용성이 제공됩니다.

PPL은 어떤 것인지, 왜 OpenMP 보다 더 좋은지 이후에 제가 적을 글을 보면 쉽게 알 수 있으리라 생각합니다.


오늘은 PPL의 개념에 대한 이야기로 마치고 다음에는 PPL의 하나인 task에 대해서 이야기 하겠습니다.
시간 여유가 있거나 task에 대해서 빨리 알고 싶은 분들은 일전에 정재원님이 task 예제를 설명한 글을 올린 적이 있으니 먼저 그것을 보면서 예습을 하는 것도 좋습니다.