Asynchronous Agents Library – agent. 2 ( 기능 )

VC++ 10 Concurrency Runtime 2010. 6. 13. 09:00 Posted by 알 수 없는 사용자

Asynchronous Agents Library – agent. 2 ( 기능 )

작성자: 임준환( mumbi at daum dot net )

 

void run();

Agent 클래스를 상속 받아 작업을 하는 agent 를 만들 때, 처음으로 해야 할 일은 run() 재정의입니다.

 run() 는 CPU 가 해당 agent 스레드의 컨텍스트를 처리할 때, 수행되는 메소드입니다. 즉, 바로 agent 가 책임지고 처리해야 할 작업( task )이고, run() 을 재정의하기 위해 agent class 가 존재한다고 해도 과언이 아닐 정도로 중요합니다.

 Asynchronous Agents Library( 이하 AAL )을 사용하지 않고 Win32 API 로 직접 스레드를 생성할 때, 지정하는 콜백 함수와 같은 역할을 합니다.

 run() 에 필요한 정보( 매개 변수 )가 있다면 agent 를 상속 받은 클래스의 생성자를 이용하여 전달하면 됩니다.

 run() 이 호출될 때, agent 의 상태는 agent_started 가 됩니다.

 run() 을 재정의할 때, 주의할 점은 run() 이 끝나기 전에 done() 을 호출해야 한다는 것입니다. 실제로 run() 이 끝났다는 것은 작업이 끝난 것이지만 상태는 여전히 agent_started 이기 때문에 계속 수행 중인 것으로 인식됩니다. 그러므로 agent 의 상태를 바꿔주기 위해 반드시 run() 이 끝나기 전에 done() 을 호출해야 합니다.

또한 run() 은 어떤 예외도 던지지 않습니다.

 

bool done();

Agent 의 작업이 완료되었음을 알립니다. 이것은 agent 의 상태를 agent_done 으로 바꾸는 것을 의미합니다.

제대로 agent_done 상태가 되면 true 를 반환합니다. cancel() 에 의해 agent_cancel 상태인 agentagent_done 상태가 되지 않고 false 를 반환합니다.

 protected 로 지정되어 있어 메소드 내에서만 호출할 수 있습니다.

 

bool start();

 start() 를 호출함으로써 CPU 스케줄에 의해 run() 이 호출되는 것입니다. run() 이 호출되기 위해서는 반드시 start() 를 호출해야 합니다. 직접 run() 을 호출하면 병렬 처리 또는 비 동기 처리되지 않고, 호출한 스레드의 컨텍스트에서 일반 함수를 호출한 것과 같게 됩니다.

그러므로 직접 run() 을 호출하는 일은 없어야 하며, 꼭 start() 를 호출하도록 해야 합니다.

 start() 는 agent 의 상태를 agent_created 에서 agent_runnable 로 바꿉니다. 즉, 스케줄하여 컨텍스트 스위칭( context switching ) 의 대상이 되도록 합니다.

Agent 가 제대로 스케줄 되었다면 true 를 반환합니다. 스케줄 되기 전( start() 호출 전 )에 cancel() 을 호출하면 스케줄 되지 않고 false 를 반환합니다.

 

bool cancel();

Agent 객체의 작업을 취소할 때 사용합니다.

Agent 객체가 생성되어 agent_created 상태가 되거나, start() 에 의해 agent_runnable 상태일 때에 작업을 취소하고 종료된 상태인 agent_cancel 상태로 바꿉니다.

다시 말해, run() 이 호출되어 agent_started 상태에서는 agent_cancel 상태로 바뀌지 않고 실패하여 false 를 반환합니다. 제대로 agent_cancel 상태로 바뀌었다면 true 를 반환합니다.

 

agent_status status();

Agent 객체의 현재 상태를 반환합니다.

 agent_statusenum 형으로 agent_canceled( 취소됨 ), agent_created( 생성됨 ), agent_done( 작업 완료 ), agent_runnable( 스케줄 됨 ), agent_started( 실행 중 ) 를 나타냅니다.

반환된 상태는 동기화되어 정확한 상태를 반환하지만, 반환하자마자 agent 의 상태가 변할 수 있어 반환된 상태가 현재 agent 의 상태라고 확신하면 안 됩니다.

 

ISource<agent_status> * status_port();

 status() 는 동기화된 agent 의 상태를 반환하는 반면, status_port() 는 비 동기 메커니즘은 message 를 통해 반환됩니다.

반환형인 ISource 는 message 메커니즘의 interface 입니다. ISource interface 형은 receive() 로 내용을 꺼내올 수 있습니다.

아직 message 에 대해서 언급하지 않았기 때문에 이해가 안 될 수 있습니다. 곧 message 에 대해서 설명할 것인데 그 때, 이 함수가 어떻게 동작하는지 알 수 있을 것입니다.

 

static agent_status wait( agent * _PAgent, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );

Win32 API 의 WaitForSingleObject() 와 같은 기능을 합니다.

인자로 넘긴 agent 의 작업이 종료 상태가 될 때까지 기다립니다. 종료 상태란 agent_cancel, agent_done 상태를 말합니다.

기다릴 최대 시간( timeout )을 정할 수 있는데 COOPERATIVE_TIMEOUT_INFINITE 는 무한대를 의미하며, 기본 값으로 지정되어 있습니다. 이 때, 최대 시간은 밀리 초( millisecond ) 단위입니다.

최대 시간을 정했을 경우, 최대 시간까지 agent 의 작업이 종료되지 않으면, operation_timed_out 예외가 발생합니다. 그러므로 최대 시간을 고려한 프로그래밍을 할 경우, 이 예외를 처리함으로써 최대 시간 이 후를 제어해야 합니다.

Agent 의 상태를 반환하는데, 이 상태는 agent_cancel 또는 agent_done 입니다. 왜냐하면 앞의 두 상태 중 하나가 되어야만 반환을 하기 때문입니다.

 

static void wait_for_all( size_t _Count, __in_ecount(_Count) agent ** _PAgents, __out_ecount_opt(_Count) agent_status * _PStatus = NULL, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );

Win32 API 의 WaitForMultipleObjects() ( 3번째 인자가 TRUE 인 )와 같은 기능을 합니다.

인자로 전달되는 agent 배열이 모두 완료되었을 때까지 기다립니다.

인자를 살펴보면,

  • _Count - 기다릴 agent 의 개수( 뒤의 인자인 agent 배열의 원소 수와 같아야 합니다. )
  • _PAgents - 기다릴 agent 들의 배열
  • _pStatus – 이 함수가 반환될 때, agent 들의 상태들을 저장할 배열( _Timedout 을 지정했을 때, operation_timed_out 예외가 발생한다면, 상태를 저장하는 작업은 수행되지 않습니다. ), 기본 값은 NULL 입니다.
  • _Timeout – 기다릴 최대 시간, 기본 값은 COOPERATIVE_TIMEOUT_INFINITE 이다.

 wait() 와 마찬가지로 기다릴 최대 시간을 지정할 경우, operation_timed_out 예외가 발생되며, 최대 시간을 고려할 경우, 이 예외를 처리해야 합니다.

 

static void wait_for_one( size_t _Count, agent ** _PAgents, agent_status& _Status, size_t& _Index, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );

Win32 API 의 WaitForMultipleObjects() ( 3번째 인자가 FALSE 인 )와 같은 기능을 합니다.

인자로 전달되는 agent 배열 중 하나라도 완료가 될 때까지 기다립니다.

인자를 살펴보면,

  • _Count – 기다릴 agent 의 개수( 뒤의 인자인 agent 배열의 원소 수와 같아야 합니다. )
  • _PAgent – 기다릴 agent 들의 배열
  • _Status – 반환될 때의 agent 상태를 저장할 agent_status 변수
  • _Index – agent 배열 중 완료된 agent 의 인덱스를 저장할 변수
  • _Timeout – 기다릴 최대 시간, 기본 값은 COOPERATIVE_TIMEOUT_INFINITE 입니다.

 wait() 와 마찬가지로 기다릴 최대 시간을 지정할 경우, operation_timed_out 예외가 발생하며, 최대 시간을 고려할 경우, 이 예외를 처리해야 합니다.

 

예제

아래의 코드는 위에 설명한 메소드들을 사용하는 상황을 보여줍니다. 특별한 시나리오는 없지만, 충분히 어떤 상황에 어떻게 사용하는지 알 수 있을 것입니다.

 TestAgentrun() 안에서 사용된 Concurrency::wait() 는 Win32 API 의 Sleep() 과 같은 기능을 합니다. agent::wait() 와 혼동하지 않기를 바랍니다.

예제를 위한 준비

#include <iostream>
#include <agents.h>

using namespace std;
using namespace Concurrency;

const wchar_t* GetAgentStatusString( agent_status status )
{
	switch( status )
	{
	case agent_created:
		return L"agent_created";

	case agent_runnable:
		return L"agent_runnable";

	case agent_started:
		return L"agent_started";

	case agent_done:
		return L"agent_done";

	case agent_canceled:
		return L"agent_canceled";

	default:
		return L"unknown";
	}
}

class TestAgent
	: public agent
{
private:
	unsigned int	id;

protected:
	virtual void run()
	{
		Concurrency::wait( this->id * 500 );		

		wcout << L"agent id: " << this->id << L" completed." << endl;

		done();
	}
public:
	TestAgent( unsigned int id )
		: id( id ) { }
};

[ 코드1. 예제를 위한 코드 ]

status_port() 의 사용

int main()
{	
	TestAgent testAgent( 5 );

	wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl;

	testAgent.start();
	
	for( unsigned int i = 0; i < 10; ++i )
	{		
		wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl;
	}

	agent::wait( &testAgent );

	wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl;
}

[ 코드2. status_port() 사용 예제 ]

 status() 와 같은 기능을 하지만 비 동기 메커니즘인 message 을 사용한다는 것이 다릅니다.

이 말은 status() 가 수행되는 동안 호출한 컨텍스트가 멈추어 있지만, status_port() 는 호출한 컨텍스트는 계속 진행되고, 다른 work 스레드의 컨텍스트가 상태를 처리하고, 그 결과를 message 로 받는 다는 말입니다.

[ 그림1. status_port() 사용 예제 실행 결과 ]


wait()의 사용

int main()
{
	TestAgent testAgent1( 3 );
	TestAgent testAgent2( 2 );
	TestAgent testAgent3( 1 );
	TestAgent testAgent4( 5 );
	TestAgent testAgent5( 4 );

	testAgent1.start();
	testAgent2.start();
	testAgent3.start();
	testAgent4.start();
	testAgent5.start();

	agent::wait( &testAgent1 );
}

[ 코드3. wait() 사용 예제 ]

각기 다른 5 개의 작업들을 수행하는 agent 들을 생성합니다. 각각의 생성자의 인자는 각 agent 의 id 이기도 하지만, 해당 작업들의 가중치이기도 합니다.

testAgent1 의 작업이 끝날 때까지 기다립니다. 곧바로 프로그램이 종료되므로 작업이 끝나지 않은 agent 들은 비정상적으로 종료됩니다. 보통 작업이 진행 중인 모든 agent 들이 완료될 때가지 기다리는 것이 좋습니다.

[ 그림2. wait() 사용 예제 실행 결과 ]


wait() 와 timeout

int main()
{
	TestAgent testAgent1( 3 );
	TestAgent testAgent2( 2 );
	TestAgent testAgent3( 1 );
	TestAgent testAgent4( 5 );
	TestAgent testAgent5( 4 );

	testAgent1.start();
	testAgent2.start();
	testAgent3.start();
	testAgent4.start();
	testAgent5.start();

	try
	{
		agent::wait( &testAgent4, 1000 );
	}
	catch( operation_timed_out& )
	{
		wcout << L"operation timed out." << endl;
	}
}

[ 코드4. wait() 와 timeout 예제 ]

 wait() 의 timeout 매개변수를 사용하려면 operation_timed_out 예외를 처리해야 합니다.

[ 그림3. wait() 와 timeout 예제 실행 결과 ]


wait_for_all() 의 사용

int main()
{
	TestAgent testAgent1( 3 );
	TestAgent testAgent2( 2 );
	TestAgent testAgent3( 1 );
	TestAgent testAgent4( 5 );
	TestAgent testAgent5( 4 );

	testAgent1.start();
	testAgent2.start();
	testAgent3.start();
	testAgent4.start();
	testAgent5.start();

	agent* runningAgents[5] = {
		&testAgent1,
		&testAgent2,
		&testAgent3,
		&testAgent4,
		&testAgent5
	};

	unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] );	

	agent::wait_for_all( runningAgentCount, runningAgents );
}

[ 코드5. wait_for_all() 사용 예제 ]

배열에 포함된 모든 agent 들이 완료될 때까지 기다립니다.

[ 그림4. wait_for_all() 사용 예제 실행 결과 ]


wait_for_all() 과 상태 반환

int main()
{
	TestAgent testAgent1( 3 );
	TestAgent testAgent2( 2 );
	TestAgent testAgent3( 1 );
	TestAgent testAgent4( 5 );
	TestAgent testAgent5( 4 );

	testAgent1.start();
	testAgent2.start();
	testAgent3.start();
	testAgent4.start();
	testAgent5.start();

	agent* runningAgents[5] = {
		&testAgent1,
		&testAgent2,
		&testAgent3,
		&testAgent4,
		&testAgent5
	};

	agent_status statuses[5];

	unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] );
	agent::wait_for_all( runningAgentCount, runningAgents, statuses );

	for( unsigned int i = 0; i < runningAgentCount; ++i )
		wcout << GetAgentStatusString( statuses[ i ] ) << endl;
}

[ 코드6. wait_for_all() 과 상태 반환 예제 ]

완료된 agent 들의 상태를 저장하는 배열의 크기는 agent 들을 포함한 배열의 크기와 같아야 합니다다. 만약 timeout 예외를 사용한다면 상태는 저장되지 않습니다.

[ 그림5. wait_for_all() 과 상태 반환 예제 실행 결과 ]


wait_for_one() 의 사용

int main()
{
	TestAgent testAgent1( 3 );
	TestAgent testAgent2( 2 );
	TestAgent testAgent3( 1 );
	TestAgent testAgent4( 5 );
	TestAgent testAgent5( 4 );

	testAgent1.start();
	testAgent2.start();
	testAgent3.start();
	testAgent4.start();
	testAgent5.start();

	agent* runningAgents[5] = {
		&testAgent1,
		&testAgent2,
		&testAgent3,
		&testAgent4,
		&testAgent5
	};

	agent_status finalStatus;
	unsigned int indexOfcompletedAgentFirst;

	unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] );	
	agent::wait_for_one( runningAgentCount, runningAgents, finalStatus, indexOfcompletedAgentFirst );

	wcout << L"final status : " << GetAgentStatusString( finalStatus ) << endl;
	wcout << L"index of completed agent first : " << indexOfcompletedAgentFirst << endl;	
}

[ 코드7. wait_for_one() 사용 예제 ]

여러 agent 들 중 하나라도 완료 되었을 때 반환됩니다. 상태 저장을 위한 변수와 인덱스 저장을 위한 변수는 참조로 넘겨야 하므로 생략할 수 없습니다.

[ 그림6. wait_for_one() 사용 예제 실행 결과 ]


 

마치는 글

위의 내용을 모두 숙지했다면 agent 를 마치 하나의 work 스레드처럼 다룰 수 있을 것입니다. 이렇게 하여 비 동기 병렬 처리를 쉽게 처리할 수 있습니다.

하지만 이것이 끝이 아닙니다. agent 에 message 메커니즘을 적용한다면 더욱 더 지능적인 agent 를 쉽게 만들 수 있습니다.

다음 글에서는 message 메커니즘에 대해 소개해 보도록 하겠습니다.