Asynchronous Agents Library – agent. 2 ( 기능 )
작성자: 임준환( mumbi at daum dot net )
void run();
Agent 클래스를 상속 받아 작업을 하는 agent 를 만들 때, 처음으로 해야 할 일은 run() 재정의입니다.
run() 는 CPU 가 해당 agent 스레드의 컨텍스트를 처리할 때, 수행되는 메소드입니다. 즉, 바로 agent 가 책임지고 처리해야 할 작업( task )이고, run() 을 재정의하기 위해 agent class 가 존재한다고 해도 과언이 아닐 정도로 중요합니다.
Asynchronous Agents Library( 이하 AAL )을 사용하지 않고 Win32 API 로 직접 스레드를 생성할 때, 지정하는 콜백 함수와 같은 역할을 합니다.
run() 에 필요한 정보( 매개 변수 )가 있다면 agent 를 상속 받은 클래스의 생성자를 이용하여 전달하면 됩니다.
run() 이 호출될 때, agent 의 상태는 agent_started 가 됩니다.
run() 을 재정의할 때, 주의할 점은 run() 이 끝나기 전에 done() 을 호출해야 한다는 것입니다. 실제로 run() 이 끝났다는 것은 작업이 끝난 것이지만 상태는 여전히 agent_started 이기 때문에 계속 수행 중인 것으로 인식됩니다. 그러므로 agent 의 상태를 바꿔주기 위해 반드시 run() 이 끝나기 전에 done() 을 호출해야 합니다.
또한 run() 은 어떤 예외도 던지지 않습니다.
bool done();
Agent 의 작업이 완료되었음을 알립니다. 이것은 agent 의 상태를 agent_done 으로 바꾸는 것을 의미합니다.
제대로 agent_done 상태가 되면 true 를 반환합니다. cancel() 에 의해 agent_cancel 상태인 agent 는 agent_done 상태가 되지 않고 false 를 반환합니다.
protected 로 지정되어 있어 메소드 내에서만 호출할 수 있습니다.
bool start();
start() 를 호출함으로써 CPU 스케줄에 의해 run() 이 호출되는 것입니다. run() 이 호출되기 위해서는 반드시 start() 를 호출해야 합니다. 직접 run() 을 호출하면 병렬 처리 또는 비 동기 처리되지 않고, 호출한 스레드의 컨텍스트에서 일반 함수를 호출한 것과 같게 됩니다.
그러므로 직접 run() 을 호출하는 일은 없어야 하며, 꼭 start() 를 호출하도록 해야 합니다.
start() 는 agent 의 상태를 agent_created 에서 agent_runnable 로 바꿉니다. 즉, 스케줄하여 컨텍스트 스위칭( context switching ) 의 대상이 되도록 합니다.
Agent 가 제대로 스케줄 되었다면 true 를 반환합니다. 스케줄 되기 전( start() 호출 전 )에 cancel() 을 호출하면 스케줄 되지 않고 false 를 반환합니다.
bool cancel();
Agent 객체의 작업을 취소할 때 사용합니다.
Agent 객체가 생성되어 agent_created 상태가 되거나, start() 에 의해 agent_runnable 상태일 때에 작업을 취소하고 종료된 상태인 agent_cancel 상태로 바꿉니다.
다시 말해, run() 이 호출되어 agent_started 상태에서는 agent_cancel 상태로 바뀌지 않고 실패하여 false 를 반환합니다. 제대로 agent_cancel 상태로 바뀌었다면 true 를 반환합니다.
agent_status status();
Agent 객체의 현재 상태를 반환합니다.
agent_status 는 enum 형으로 agent_canceled( 취소됨 ), agent_created( 생성됨 ), agent_done( 작업 완료 ), agent_runnable( 스케줄 됨 ), agent_started( 실행 중 ) 를 나타냅니다.
반환된 상태는 동기화되어 정확한 상태를 반환하지만, 반환하자마자 agent 의 상태가 변할 수 있어 반환된 상태가 현재 agent 의 상태라고 확신하면 안 됩니다.
ISource<agent_status> * status_port();
status() 는 동기화된 agent 의 상태를 반환하는 반면, status_port() 는 비 동기 메커니즘은 message 를 통해 반환됩니다.
반환형인 ISource 는 message 메커니즘의 interface 입니다. ISource interface 형은 receive() 로 내용을 꺼내올 수 있습니다.
아직 message 에 대해서 언급하지 않았기 때문에 이해가 안 될 수 있습니다. 곧 message 에 대해서 설명할 것인데 그 때, 이 함수가 어떻게 동작하는지 알 수 있을 것입니다.
static agent_status wait( agent * _PAgent, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );
Win32 API 의 WaitForSingleObject() 와 같은 기능을 합니다.
인자로 넘긴 agent 의 작업이 종료 상태가 될 때까지 기다립니다. 종료 상태란 agent_cancel, agent_done 상태를 말합니다.
기다릴 최대 시간( timeout )을 정할 수 있는데 COOPERATIVE_TIMEOUT_INFINITE 는 무한대를 의미하며, 기본 값으로 지정되어 있습니다. 이 때, 최대 시간은 밀리 초( millisecond ) 단위입니다.
최대 시간을 정했을 경우, 최대 시간까지 agent 의 작업이 종료되지 않으면, operation_timed_out 예외가 발생합니다. 그러므로 최대 시간을 고려한 프로그래밍을 할 경우, 이 예외를 처리함으로써 최대 시간 이 후를 제어해야 합니다.
Agent 의 상태를 반환하는데, 이 상태는 agent_cancel 또는 agent_done 입니다. 왜냐하면 앞의 두 상태 중 하나가 되어야만 반환을 하기 때문입니다.
static void wait_for_all( size_t _Count, __in_ecount(_Count) agent ** _PAgents, __out_ecount_opt(_Count) agent_status * _PStatus = NULL, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );
Win32 API 의 WaitForMultipleObjects() ( 3번째 인자가 TRUE 인 )와 같은 기능을 합니다.
인자로 전달되는 agent 배열이 모두 완료되었을 때까지 기다립니다.
인자를 살펴보면,
- _Count - 기다릴 agent 의 개수( 뒤의 인자인 agent 배열의 원소 수와 같아야 합니다. )
- _PAgents - 기다릴 agent 들의 배열
- _pStatus – 이 함수가 반환될 때, agent 들의 상태들을 저장할 배열( _Timedout 을 지정했을 때, operation_timed_out 예외가 발생한다면, 상태를 저장하는 작업은 수행되지 않습니다. ), 기본 값은 NULL 입니다.
- _Timeout – 기다릴 최대 시간, 기본 값은 COOPERATIVE_TIMEOUT_INFINITE 이다.
wait() 와 마찬가지로 기다릴 최대 시간을 지정할 경우, operation_timed_out 예외가 발생되며, 최대 시간을 고려할 경우, 이 예외를 처리해야 합니다.
static void wait_for_one( size_t _Count, agent ** _PAgents, agent_status& _Status, size_t& _Index, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );
Win32 API 의 WaitForMultipleObjects() ( 3번째 인자가 FALSE 인 )와 같은 기능을 합니다.
인자로 전달되는 agent 배열 중 하나라도 완료가 될 때까지 기다립니다.
인자를 살펴보면,
- _Count – 기다릴 agent 의 개수( 뒤의 인자인 agent 배열의 원소 수와 같아야 합니다. )
- _PAgent – 기다릴 agent 들의 배열
- _Status – 반환될 때의 agent 상태를 저장할 agent_status 변수
- _Index – agent 배열 중 완료된 agent 의 인덱스를 저장할 변수
- _Timeout – 기다릴 최대 시간, 기본 값은 COOPERATIVE_TIMEOUT_INFINITE 입니다.
wait() 와 마찬가지로 기다릴 최대 시간을 지정할 경우, operation_timed_out 예외가 발생하며, 최대 시간을 고려할 경우, 이 예외를 처리해야 합니다.
예제
아래의 코드는 위에 설명한 메소드들을 사용하는 상황을 보여줍니다. 특별한 시나리오는 없지만, 충분히 어떤 상황에 어떻게 사용하는지 알 수 있을 것입니다.
TestAgent 의 run() 안에서 사용된 Concurrency::wait() 는 Win32 API 의 Sleep() 과 같은 기능을 합니다. agent::wait() 와 혼동하지 않기를 바랍니다.
예제를 위한 준비
#include <iostream> #include <agents.h> using namespace std; using namespace Concurrency; const wchar_t* GetAgentStatusString( agent_status status ) { switch( status ) { case agent_created: return L"agent_created"; case agent_runnable: return L"agent_runnable"; case agent_started: return L"agent_started"; case agent_done: return L"agent_done"; case agent_canceled: return L"agent_canceled"; default: return L"unknown"; } } class TestAgent : public agent { private: unsigned int id; protected: virtual void run() { Concurrency::wait( this->id * 500 ); wcout << L"agent id: " << this->id << L" completed." << endl; done(); } public: TestAgent( unsigned int id ) : id( id ) { } };
[ 코드1. 예제를 위한 코드 ]
status_port() 의 사용
int main() { TestAgent testAgent( 5 ); wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl; testAgent.start(); for( unsigned int i = 0; i < 10; ++i ) { wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl; } agent::wait( &testAgent ); wcout << GetAgentStatusString( receive( testAgent.status_port() ) ) << endl; }
[ 코드2. status_port() 사용 예제 ]
status() 와 같은 기능을 하지만 비 동기 메커니즘인 message 을 사용한다는 것이 다릅니다.
이 말은 status() 가 수행되는 동안 호출한 컨텍스트가 멈추어 있지만, status_port() 는 호출한 컨텍스트는 계속 진행되고, 다른 work 스레드의 컨텍스트가 상태를 처리하고, 그 결과를 message 로 받는 다는 말입니다.
[ 그림1. status_port() 사용 예제 실행 결과 ]
wait()의 사용
int main() { TestAgent testAgent1( 3 ); TestAgent testAgent2( 2 ); TestAgent testAgent3( 1 ); TestAgent testAgent4( 5 ); TestAgent testAgent5( 4 ); testAgent1.start(); testAgent2.start(); testAgent3.start(); testAgent4.start(); testAgent5.start(); agent::wait( &testAgent1 ); }
[ 코드3. wait() 사용 예제 ]
각기 다른 5 개의 작업들을 수행하는 agent 들을 생성합니다. 각각의 생성자의 인자는 각 agent 의 id 이기도 하지만, 해당 작업들의 가중치이기도 합니다.
testAgent1 의 작업이 끝날 때까지 기다립니다. 곧바로 프로그램이 종료되므로 작업이 끝나지 않은 agent 들은 비정상적으로 종료됩니다. 보통 작업이 진행 중인 모든 agent 들이 완료될 때가지 기다리는 것이 좋습니다.
[ 그림2. wait() 사용 예제 실행 결과 ]
wait() 와 timeout
int main() { TestAgent testAgent1( 3 ); TestAgent testAgent2( 2 ); TestAgent testAgent3( 1 ); TestAgent testAgent4( 5 ); TestAgent testAgent5( 4 ); testAgent1.start(); testAgent2.start(); testAgent3.start(); testAgent4.start(); testAgent5.start(); try { agent::wait( &testAgent4, 1000 ); } catch( operation_timed_out& ) { wcout << L"operation timed out." << endl; } }
[ 코드4. wait() 와 timeout 예제 ]
wait() 의 timeout 매개변수를 사용하려면 operation_timed_out 예외를 처리해야 합니다.
[ 그림3. wait() 와 timeout 예제 실행 결과 ]
wait_for_all() 의 사용
int main() { TestAgent testAgent1( 3 ); TestAgent testAgent2( 2 ); TestAgent testAgent3( 1 ); TestAgent testAgent4( 5 ); TestAgent testAgent5( 4 ); testAgent1.start(); testAgent2.start(); testAgent3.start(); testAgent4.start(); testAgent5.start(); agent* runningAgents[5] = { &testAgent1, &testAgent2, &testAgent3, &testAgent4, &testAgent5 }; unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] ); agent::wait_for_all( runningAgentCount, runningAgents ); }
[ 코드5. wait_for_all() 사용 예제 ]
배열에 포함된 모든 agent 들이 완료될 때까지 기다립니다.
[ 그림4. wait_for_all() 사용 예제 실행 결과 ]
wait_for_all() 과 상태 반환
int main() { TestAgent testAgent1( 3 ); TestAgent testAgent2( 2 ); TestAgent testAgent3( 1 ); TestAgent testAgent4( 5 ); TestAgent testAgent5( 4 ); testAgent1.start(); testAgent2.start(); testAgent3.start(); testAgent4.start(); testAgent5.start(); agent* runningAgents[5] = { &testAgent1, &testAgent2, &testAgent3, &testAgent4, &testAgent5 }; agent_status statuses[5]; unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] ); agent::wait_for_all( runningAgentCount, runningAgents, statuses ); for( unsigned int i = 0; i < runningAgentCount; ++i ) wcout << GetAgentStatusString( statuses[ i ] ) << endl; }
[ 코드6. wait_for_all() 과 상태 반환 예제 ]
완료된 agent 들의 상태를 저장하는 배열의 크기는 agent 들을 포함한 배열의 크기와 같아야 합니다다. 만약 timeout 예외를 사용한다면 상태는 저장되지 않습니다.
[ 그림5. wait_for_all() 과 상태 반환 예제 실행 결과 ]
wait_for_one() 의 사용
int main() { TestAgent testAgent1( 3 ); TestAgent testAgent2( 2 ); TestAgent testAgent3( 1 ); TestAgent testAgent4( 5 ); TestAgent testAgent5( 4 ); testAgent1.start(); testAgent2.start(); testAgent3.start(); testAgent4.start(); testAgent5.start(); agent* runningAgents[5] = { &testAgent1, &testAgent2, &testAgent3, &testAgent4, &testAgent5 }; agent_status finalStatus; unsigned int indexOfcompletedAgentFirst; unsigned int runningAgentCount = sizeof( runningAgents ) / sizeof( runningAgents[0] ); agent::wait_for_one( runningAgentCount, runningAgents, finalStatus, indexOfcompletedAgentFirst ); wcout << L"final status : " << GetAgentStatusString( finalStatus ) << endl; wcout << L"index of completed agent first : " << indexOfcompletedAgentFirst << endl; }
[ 코드7. wait_for_one() 사용 예제 ]
여러 agent 들 중 하나라도 완료 되었을 때 반환됩니다. 상태 저장을 위한 변수와 인덱스 저장을 위한 변수는 참조로 넘겨야 하므로 생략할 수 없습니다.
[ 그림6. wait_for_one() 사용 예제 실행 결과 ]
마치는 글
위의 내용을 모두 숙지했다면 agent 를 마치 하나의 work 스레드처럼 다룰 수 있을 것입니다. 이렇게 하여 비 동기 병렬 처리를 쉽게 처리할 수 있습니다.
하지만 이것이 끝이 아닙니다. agent 에 message 메커니즘을 적용한다면 더욱 더 지능적인 agent 를 쉽게 만들 수 있습니다.
다음 글에서는 message 메커니즘에 대해 소개해 보도록 하겠습니다.
'VC++ 10 Concurrency Runtime' 카테고리의 다른 글
Asynchronous Agents Library – message 전달 함수. 2 ( 수신 ) (0) | 2010.07.03 |
---|---|
Asynchronous Agents Library - message 전달 함수. 1 ( 전송 ) (0) | 2010.06.26 |
Asynchronous Agents Library - agent. 1 ( 상태 ) (4) | 2010.06.05 |
Asynchronous Agents Library 소개 (0) | 2010.05.29 |
Concurrency Runtime(ConcRT)의 디버그 모드에서 메모리 leak 문제 (5) | 2010.04.19 |