Asynchronous Agents Library 소개

VC++ 10 Concurrency Runtime 2010. 5. 29. 09:00 Posted by 알 수 없는 사용자

Asynchronous Agents Library 소개

작성자: 임준환( mumbi at daum dot net )

 

Concurrency Runtime 의 컴포넌트

[ 그림1. Concurrency Runtime Architecture ]

[ 그림1. Concurrency Runtime Architecture ]


 Asynchronous Agents Library 는 위 그림에서 보듯이 Concurrency Runtime 프레임워크 안에서 돌아가는 내부 컴포넌트 중 하나입니다.

 라이브러리라는 명칭에서 독립적으로 수행될 것 같은 오해를 가질 수 있으나, 사실은 Concurrency Runtime 의 Task Scheduler 와 Resource Manager 를 기반으로 만들어졌습니다.

 Concurrency Runtime 에서 Task Scheduler 와 Resource Manager 를 하위 레벨 컴포넌트라고 본다면, Asynchronouse Agent Library( 이하, AAL ) 와 Parallel Patterns Library( 이하, PPL ) 은 상위 레벨 컴포넌트라고 볼 수 있습니다. 다시 말해, AAL, PPL 모두 Task Scheduler 와 Resource Manager 를 사용한다는 말입니다.

 Concurrency Runtime 이 아닌 다른 프레임워크에서도 하위 레벨보다 상위 레벨 컴포넌트가 사용 용이성과 안정성이 높은 것처럼 Concurrency Runtime 에서도 마찬가지입니다.

 Concurrency Runtime 개발자들은 비 동기 작업들을 하기 위해서는 AAL, 단위 작업의 병렬 처리를 하기 위해서는 PPL 을 사용하기를 권하고, 좀 더 특별한 최적화나 하위 레벨 작업이 필요한 경우에만 직접 Task Scheduler 를 사용하기를 권하고 있습니다.

 즉, AAL 은 Concurrency Runtime 의 비 동기 작업을 위한 인터페이스라고 볼 수 있습니다.( PPL 은 병렬 처리를 위한 인터페이스라고 볼 수 있습니다. )

 

비 동기 작업

 비 동기 작업이란 작업이 끝날 때까지 기다리지 않는다는 말입니다. 즉, 어떤 함수가 있을 때, 그 함수는 바로 반환되어야 합니다. 바로 반환되면 호출한 스레드는 바로 다른 작업을 수행할 수 있습니다. 이 때, 해당 함수의 작업은 호출한 스레드가 아니라 다른 작업 스레드에서 수행되고, 작업이 완료되면 호출한 스레드에게 통지하거나, 콜백( call back ) 함수를 호출합니다.

 이러한 처리 방식을 비 동기 처리 방식이라고 합니다. AAL 은 이런 비 동기 작업을 용이하게 해주는 라이브러리입니다.

 

기능

 AAL 은 크게 두 가지 특징을 가지고 있습니다. 하나는 actor-based 프로그래밍 모델이고, 다른 하나는 message passing 프로그래밍 모델입니다.

Agent

 Agent 는 AAL 의 기능 중 하나인 actor-based 프로그래밍 모델의 actor 를 나타내는 개념입니다. 인공 지능 등과 같은 다른 분야에서 사용하는 개념과 마찬가지로 어떤 임무를 수행하는, 즉 어떤 역할을 하는 객체를 일컫습니다.

 예를 들어, 스마트 폰이 만들어 지려면 케이스( case ) 생산, 하드웨어 생산, 케이스와 하드웨어 조립, OS 및 소프트웨어 설치, 테스트와 같은 공정이 필요한데, 각 공정들을 수행하는 객체들을 agent 라고 볼 수 있습니다.

Message

 Message 는 agent 들 간의 통신을 위한 메커니즘입니다. AAL 에서의 message 란 상황이나 상태를 알리기 위한 수단이 될 수도 있고, 실제 데이터를 전송하기 위한 수단이 될 수도 있습니다.

 예를 들면, 스마트 폰 공정들 사이에 케이스가 생산되었다면 생산된 케이스를 조립하는 곳으로 전달해야 합니다. 이 때, 케이스를 전달하는 수단으로 message 가 사용될 수 있습니다. 또한 케이스와 하드웨어의 조립, OS 및 소프트웨어 설치가 완료되어야지 테스트를 진행할 수 있습니다. 이런 완료 상태와 같은 작업의 상태를 알리기 위해서 message 가 사용될 수 있습니다.

 

예제

 위에 언급한 스마트 폰 생산 과정을 AAL 을 이용하여 구현해보았습니다. 예제가 비교적 긴 편이지만 최대한 이해하기 쉽게 직관적인 코드를 작성했습니다. AAL 을 중점적으로 소개하기 위해 디자인이나 테크닉은 무시하고 작성하였습니다.

 전체적인 흐름을 이해하는 데에는 어려움이 없을 것 같으나 사용된 함수들이 어떤 역할을 하는지 궁금할 것입니다.

시나리오

  1.  스마트 폰 케이스 생산자와 하드웨어 생산자가 각각 케이스와 하드웨어를 부품으로 생산한다.
  2.  생산된 부품을 조립하는 객체에게 전달하면 조립하는 객체가 조립하게 된다.
  3.  조립된 스마트 폰은 소프트웨어 설치 객체에게 전달되고, 그 객체는 소프트웨어를 설치한다.
  4.  소프트웨어가 설치된 스마트 폰은 테스터에게 전달되고, 테스터는 테스트에 성공한 스마트 폰을 제품 컨테이너에 저장한다.
  5.  제품 컨테이너의 제품들을 출력하고 종료한다.

[ 그림2. 예제 시나리오 ]

[ 그림2. 예제 시나리오 ]


코드

// 스마트폰 생산 예제 코드
#include <iostream>
#include <agents.h>
#include <ppl.h>

using namespace std;
using namespace Concurrency;

// 케이스 클래스
class Case
{
};

// 하드웨어 클래스
class Hardware
{
};

// 소프트웨어 클래스
class Software
{	
};

// 스마트폰 클래스
class SmartPhone
{
public:
	int			uid;

	Case*		pCase;
	Hardware*	pHardware;
	Software*	pSoftware;

	~SmartPhone()
	{
		if( 0 != this->pCase )
		{
			delete this->pCase;
			this->pCase = 0;
		}

		if( 0 != this->pHardware )
		{
			delete this->pHardware;
			this->pCase = 0;
		}

		if( 0 != this->pSoftware )
		{
			delete this->pSoftware;
			this->pSoftware = 0;
		}		
	}

	SmartPhone( int _uid, Case* _pCase = 0, Hardware* _pHardware = 0, Software* _pSoftware = 0 )
		: uid( _uid )
		, pCase( _pCase )
		, pHardware( _pHardware )
		, pSoftware( 0 )
	{
		if( 0 != _pSoftware )
			this->pSoftware = new Software( *_pSoftware );		
	}		

	void SetSoftware( Software* _pSoftware )
	{
		if( 0 != this->pSoftware )
			delete this->pSoftware;

		this->pSoftware = new Software( *_pSoftware );
	}
};

// 메시지 버퍼 typedef
typedef unbounded_buffer< Case* >		CaseBuffer;
typedef unbounded_buffer< Hardware* >	HardwareBuffer;
typedef unbounded_buffer< SmartPhone* >	SmartPhoneBuffer;

typedef vector< SmartPhone* >			SmartPhoneVector;

// 케이스 생산자 agent
class CaseProducer
	: public agent
{
private:
	unsigned int	caseCountToCreate;
	CaseBuffer&		caseBuffer;	

protected:
	void run()
	{
		// 필요한 개수 만큼 케이스를 생산하고 케이스 버퍼로 보낸다.
		for( unsigned int i = 0; i < this->caseCountToCreate; ++i )
		{
			send( this->caseBuffer, new Case );
			wcout << i << L": created a case." << endl;
		}

		// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
		send( this->caseBuffer, static_cast< Case* >( 0 ) );

		done();
	}

public:	
	CaseProducer( unsigned int _caseCountToCreate, CaseBuffer& _caseBuffer )
		: caseBuffer( _caseBuffer )
		, caseCountToCreate( _caseCountToCreate ) { }	
};

// 하드웨어 생산자 agent
class HardwareProducer
	: public agent
{	
private:
	unsigned int		hardwareCountToCreate;
	HardwareBuffer&		hardwareBuffer;

protected:
	void run()
	{
		// 필요한 개수 만큼 하드웨어를 생산하고 하드웨어 버퍼로 보낸다.
		for( unsigned int i = 0; i < this->hardwareCountToCreate; ++i )
		{
			send( this->hardwareBuffer, new Hardware );
			wcout << i << L": created a hardware." << endl;
		}

		// 모두 보냈으면 생산을 완료했다는 메시지로 널( 0 ) 포인터를 보낸다.
		send( this->hardwareBuffer, static_cast< Hardware* >( 0 ) );

		done();		
	}

public:
	HardwareProducer( unsigned int _hardwareCountToCreate, HardwareBuffer& _hardwareBuffer )
		: hardwareCountToCreate( _hardwareCountToCreate )
		, hardwareBuffer( _hardwareBuffer ) { }
};

// 부품( 케이스와 하드웨어)을 조립하는 agent
class Assembler
	: public agent
{
private:
	CaseBuffer&			caseBuffer;
	HardwareBuffer&		hardwareBuffer;

	SmartPhoneBuffer&	incompletedSmartPhoneBuffer;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 버퍼에 쌓인 부품( 케이스와 하드웨어 )을 꺼내 조립하여 스마트폰을 만든다.
		while( true )
		{
			Case* pCase = receive( this->caseBuffer );
			Hardware* pHardware = receive( this->hardwareBuffer );

			// 더 이상 조립할 부품( 케이스 또는 하드웨어 )들이 없으면, 
			// 스마트폰 생산이 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
			if( 0 == pCase || 0 == pHardware )
			{
				// 남은 케이스를 파괴한다.
				if( 0 != pCase )
				{
					while( true )
					{
						Case* pGarbageCase = receive( this->caseBuffer );

						if( 0 == pGarbageCase )
							break;

						delete pGarbageCase;
					}					
				}

				// 남은 하드웨어를 파괴한다.
				if( 0 != pHardware )
				{
					while( true )
					{
						Hardware* pGarbageHardware = receive( this->hardwareBuffer );

						if( 0 == pGarbageHardware )
							break;

						delete pGarbageHardware;
					}
				}

				send( this->incompletedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
				break;
			}

			send( this->incompletedSmartPhoneBuffer,
				new SmartPhone( loopCount + 1, pCase, pHardware ) );

			wcout << loopCount << L": created a smart phone." << endl;

			++loopCount;
		}
		
		done();
	}

public:
	Assembler( CaseBuffer& _caseBuffer, HardwareBuffer& _hardwareBuffer, 
		SmartPhoneBuffer& _incompletedSmartPhoneBuffer )
		: caseBuffer( _caseBuffer )
		, hardwareBuffer( _hardwareBuffer )
		, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer ) { }	
};

// 소프트웨어 설치 agent
class SoftwareInstaller
	: public agent
{	
private:
	Software&			software;	

	SmartPhoneBuffer&	incompletedSmartPhoneBuffer;
	SmartPhoneBuffer&	completedSmartPhoneBuffer;

	SmartPhoneVector&	faultySmartPhoneArray;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 조립된 스마트폰에 소프트웨어를 설치한다.
		while( true )
		{
			SmartPhone* pSmartPhone = receive( this->incompletedSmartPhoneBuffer );

			// 더 이상 소프트웨어를 설치할 조립된 스마트폰이 없으면, 설치를 중단하고
			// 소프트웨어 설치도 모두 완료되었음을 알리는 메시지로 널( 0 ) 포인터를 보낸다.
			if( 0 == pSmartPhone )
			{
				send( this->completedSmartPhoneBuffer, static_cast< SmartPhone* >( 0 ) );
				break;
			}

			wcout << loopCount;

			// 제대로 조립되었는지 판단 후, 소프트웨어를 설치한다. 
			if( 0 != pSmartPhone->pCase && 0 != pSmartPhone->pHardware )
			{
				pSmartPhone->SetSoftware( &this->software );

				send( this->completedSmartPhoneBuffer, pSmartPhone );
				wcout << L": installed the software." << endl;
			}
			else
			{
				this->faultySmartPhoneArray.push_back( pSmartPhone );
				wcout << L": failed to install the software." << endl;
			}

			++loopCount;
		}		

		done();
	}

public:
	SoftwareInstaller( Software& _software, SmartPhoneBuffer& _incompletedSmartPhoneBuffer,
		SmartPhoneBuffer& _completedSmartPhoneBuffer, SmartPhoneVector& _faultySmartPhoneArray )
		: software( _software )
		, incompletedSmartPhoneBuffer( _incompletedSmartPhoneBuffer )
		, completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
		, faultySmartPhoneArray( _faultySmartPhoneArray ) { }
};

// 테스트하는 agent
class Tester
	: public agent
{
private:
	SmartPhoneBuffer&	completedSmartPhoneBuffer;

	SmartPhoneVector&	productArray;
	SmartPhoneVector&	faultySmartPhoneArray;

protected:
	void run()
	{
		unsigned int loopCount = 0;

		// 조립된 스마트폰에 소프트웨어 설치가 완료된 스마트폰을 테스트한다.
		while( true )
		{
			SmartPhone* pSmartPhone = receive( this->completedSmartPhoneBuffer );

			// 더 이상 테스트할 스마트폰이 없으면 중단한다.
			if( 0 == pSmartPhone )				
				break;

			wcout << loopCount;

			if( this->Test( pSmartPhone ) )
			{
				this->productArray.push_back( pSmartPhone );
				wcout << L": succeeded in testing." << endl;
			}
			else
			{
				this->faultySmartPhoneArray.push_back( pSmartPhone );
				wcout << L": failed to test." << endl;
			}

			++loopCount;
		}
		
		done();
	}

public:
	Tester( SmartPhoneBuffer& _completedSmartPhoneBuffer,
		SmartPhoneVector& _productArray, SmartPhoneVector& _faultySmartPhoneArray )
		: completedSmartPhoneBuffer( _completedSmartPhoneBuffer )
		, productArray( _productArray )
		, faultySmartPhoneArray( _faultySmartPhoneArray ) { }

	bool Test( SmartPhone* _pSmartPhone )
	{
		return ( 0 != _pSmartPhone->pCase && 0 != _pSmartPhone->pHardware
			&& 0 != _pSmartPhone->pSoftware );		
	}	
};

int main()
{
	// 케이스 생산자 agent 생성.
	int caseCountToCreate = 10;
	CaseBuffer caseBuffer;
	CaseProducer caseProducer( caseCountToCreate, caseBuffer );

	// 하드웨어 생산자 agent 생성.
	int hardwareCountToCreate = 10;
	HardwareBuffer hardwareBuffer;
	HardwareProducer hardwareProducer( hardwareCountToCreate, hardwareBuffer );

	// 부품( 케이스와 하드웨어 )을 조립하는 agent 생성.
	SmartPhoneBuffer	incompletedSmartPhoneBuffer;
	Assembler assembler( caseBuffer, hardwareBuffer, incompletedSmartPhoneBuffer );	
	
	// 소프트웨어 설치 agent 생성.
	SmartPhoneVector	faultySmartPhoneArray;

	SmartPhoneBuffer	completedSmartPhoneBuffer;
	SmartPhoneBuffer	faultySmartPhoneBuffer;
	Software android;
	SoftwareInstaller softwareInstaller( android, incompletedSmartPhoneBuffer,
		completedSmartPhoneBuffer, faultySmartPhoneArray );

	// 테스트하는 agent 생성.
	SmartPhoneVector	productArray;

	SmartPhoneBuffer	productBuffer;	
	Tester tester( completedSmartPhoneBuffer, productArray, faultySmartPhoneArray );

	// 모든 생성된 agent 작업 시작.
	caseProducer.start();
	hardwareProducer.start();
	assembler.start();
	softwareInstaller.start();
	tester.start();

	// 모든 agent 의 작업이 끝날 때까지 대기.
	agent* watingAgents[] = { &caseProducer, &hardwareProducer, &assembler,
		&softwareInstaller, &tester };

	agent::wait_for_all( sizeof( watingAgents ) / sizeof( agent* ), watingAgents );
	agent::wait( &caseProducer );
	agent::wait( &hardwareProducer );
	agent::wait( &assembler );
	agent::wait( &softwareInstaller );
	agent::wait( &tester );	

	// 완성된 제품 출력.
	wcout << L"completed products: " << endl;

	parallel_for_each( productArray.begin(), productArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		wcout << L"product uid: " << _pSmartPhone->uid << endl;
	});

	// 자원 정리 - 완제품.
	parallel_for_each( productArray.begin(), productArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		if( 0 != _pSmartPhone )
			delete _pSmartPhone;
	});	

	// 자원 정리 - 불량품.
	parallel_for_each( faultySmartPhoneArray.begin(), faultySmartPhoneArray.end(),
		[] ( SmartPhone* _pSmartPhone )
	{
		if( 0 != _pSmartPhone )
			delete _pSmartPhone;
	});
}

[ 코드1. 예제 코드 ]

실행 결과

 모든 agent 가 수행하는 작업이 비 동기적으로 호출되었고 결과를 기다립니다. 비 동기적으로 호출된 agent 의 작업들은 각각 독립적인 스레드에서 동시에 수행되며, 작업을 수행하는데 데이터가 필요하다면 메시지에 의해 필요한 데이터가 도착할 때까지 동기화되어 수행됩니다.

 wcout 객체는 동기화되지 않기 때문에 작업의 넘버링이 엉켜있는데 실제로 세어보면 정확히 맞아 떨어집니다.

[ 그림3. 코드1 실행 결과 ]

[ 그림3. 코드1 실행 결과 ]

메모리 누수

코드에 명시적인 메모리 누수가 없어도 Concurrency Runtime 에 의한 메모리 누수가 발견됩니다. 이것은 개발팀에서도 버그라고 인정하고 있습니다. 하지만 이번 2010 버전에는 수정되지 않을 것이고, 다음 버전인 서비스팩에 수정되어 포함될 것이라고 합니다.

참고: http://vsts2010.net/263

AAL 의 편리함

 위의 예제가 보여주다시피 AAL 을 사용하면 정말 간단하게 여러 객체의 작업을 비 동기적으로 동시에 수행시킬 수 있습니다.

만약 직접 Win32 API 를 사용하여 이를 구현하려 한다면 각 데이터를 보관하는 컨테이너에 대한 동기화 등과 같은 복잡한 메카니즘을 이벤트 등과 같은 동기화 객체를 사용하여 직접 구현해야 합니다.

물론, 결국은 OS의 커널( kernel ) 객체를 사용하여 구현되겠지만, 빠른 생산성의 이익과 동기화로 인한 스트레스로 인해 피폐해져 가는 인간성을 지킬 수 있다는 측면에서는 굉장히 편리한 도구임에 틀림없습니다.

 

마치는 글

 이번 글은 AAL 에 대한 개념과 어떻게 동작하는지 간단한 소개입니다. 다음 글에서 구현에 필요한 함수들과 사용법들을 이야기해 보겠습니다.

 

참고

  • 그림1. Concurrency Runtime Architecture - http://i.msdn.microsoft.com/dynimg/IC315446.png