Asynchronous Agents Library
– message 전달 함수. 2 ( 수신 )
작성자: 임준환( mumbi at daum dot net )
Message 수신
Message 를 message block 에 전송할 수 있듯이, message block 으로부터 수신할 수도 있습니다. message 수신 함수에도 전송 함수와 마찬가지로 동기 함수인 receive() 와 비 동기 함수인 try_receive() 가 있습니다.
동기 함수 receive()
동기 함수인 receive() 는 message block 으로부터 수신이 완료될 때 수신된 message 를 반환합니다. 만약 message block 에 어떠한 message 도 없다면 receive() 는 message block 에 수신할 message 가 있을 때까지 기다립니다.
아래는 receive() 의 선언입니다.
template < class _Type > _Type receive( ISource<_Type> * _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE ); template < class _Type > _Type receive( ISource<_Type> * _Src, filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE ); template < class _Type > _Type receive( ISource<_Type> &_Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE ); template < class _Type > _Type receive( ISource<_Type> &_Src, filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE );
[ 코드1. receive() 의 선언 ]
템플릿 매개변수인 _Type 은 message 의 자료 형입니다.
함수 매개변수 중 _Src 는 message block 의 인터페이스 중 하나인 ISource 를 상속한 message block 객체이며, 이 객체로부터 message 를 수신합니다.
함수 매개변수 중 _Timeout 은 최대 대기 시간입니다. 이것은 receive() 가 동기 함수이기 때문에 영원히 기다릴 상황을 대비하는 방법입니다. 이 매개변수를 지정했을 때, 최대 대기 시간을 초과하였을 경우, agent::wait()( Asynchronous Agents Library – agent. 2 ( 기능 ) 참고 ) 와 마찬가지로 operation_timed_out 예외를 발생합니다. 그러므로 이 매개변수를 지정 시 반드시 해당 예외를 처리해주어야 합니다. 기본 인자로 COOPERATIVE_TIME_INFINITE 가 지정되어 있으며, 무한히 기다리는 것을 의미합니다.
함수 매개변수 중 _Filter_proc 는 message 를 거부할 수 있는 필터입니다. message block 생성자로 지정할 수 있는 필터와 마찬가지로 std::tr1::function<bool(_Type const&)> 입니다.
Message 의 수신이 완료되면 해당 message 를 반환합니다.
예제
- 수신할 message 가 있는 경우
#include <iostream> #include <string> #include <agents.h> using namespace std; using namespace Concurrency; int main() { // message block unbounded_buffer< wstring > message_block; wstring send_message( L"first message!" ); send( message_block, send_message ); wstring receive_message; try { receive_message = receive( message_block, 2000 ); } catch( operation_timed_out& e ) { wcout << L"operation_timed_out exception" << e.what() << endl; } wcout << L"received message: " << receive_message << endl; }
[ 코드2. receive() 의 수신할 message 가 있는 경우 예제 ]
receive() 의 매개변수로 최대 대기 시간을 지정했지만 message block 에 message 가 존재하기 때문에 수신하고 바로 반환합니다.
[ 그림1. receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]
- 수신할 message 가 없는 경우
#include <iostream> #include <string> #include <agents.h> using namespace std; using namespace Concurrency; int main() { // message block unbounded_buffer< wstring > message_block; // wstring send_message( L"first message!" ); // send( message_block, send_message ); wstring receive_message; try { receive_message = receive( message_block, 2000 ); } catch( operation_timed_out& e ) { wcout << L"operation_timed_out exception" << e.what() << endl; } wcout << L"received message: " << receive_message << endl; }
[ 코드3. receive() 의 수신할 message 가 없는 경우 예제 ]
send() 를 주석 처리하여 message block 에 전달한 message 가 없기 때문에 receive() 는 지정된 최대 대기 시간인 2초( 2000 milli second ) 동안 기다린 후, operation_timed_out 예외가 발생합니다.
이 예외를 처리해야 정상적으로 프로그램이 진행됩니다.
만약 최대 대기 시간을 지정하지 않았다면 무한 대기하게 됩니다.
[ 그림2. receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]
비 동기 함수 try_receive()
비 동기 함수인 try_receive() 는 message 가 수신될 때까지 기다리지 않습니다. 만약 수신할 message block 에 어떠한 message 도 없다고 하더라도 기다리지 않고, 바로 반환됩니다.
아래는 try_receive() 의 선언입니다.
template < class _Type > bool try_receive( ISource<_Type> * _Src, _Type & _value ); template < class _Type > bool try_receive( ISource<_Type> * _Src, _Type & _value, filter_method const& _Filter_proc ); template < class _Type > bool try_receive( ISource<_Type> & _Src, _Type & _value ); template < class _Type > bool try_receive( ISource<_Type> & _Src, _Type & _value, filter_method const& _Filter_proc );
[ 코드4. try_receive() 의 선언 ]
템플릿 매개변수인 _Type 은 receive() 와 마찬가지로 message 의 자료 형입니다.
함수 매개변수 중 _Src 도 receive() 와 마찬가지로 message block 의 인터페이스 중 하나인 ISource 를 상속한 message block 객체이며, 이 객체로부터 message 를 수신합니다.
함수 매개변수 중 _value 는 수신한 message 를 저장할 변수의 참조입니다. 수신이 성공하면 message 는 이 참조가 가리키는 변수에 저장됩니다.
함수 매개변수 중 _Filter_proc 는 receive() 와 마찬가지로 message 를 거부할 수 있는 필터입니다.
try_receive() 는 수신의 완료를 기다리지 않기 때문에 수신을 시도했을 때( try_receive() 를 호출했을 때 ) message block 에 어떠한 message 도 없다면 false 를 반환해 알려줍니다. message 가 있다면 true 를 반환합니다.
만약, 수신 시도를 하자마자 시도한 컨텍스트가 계속 진행되기를 원한다면 receive() 에 _Timeout 매개변수에 0 을 지정하기 보다는 try_receive() 를 사용하는게 바람직합니다.
예제
- 수신할 message 가 있는 경우
#include <iostream> #include <string> #include <agents.h> using namespace std; using namespace Concurrency; int main() { // message block unbounded_buffer< wstring > message_block; send( message_block, wstring( L"first message!" ) ); wstring received_message; if( try_receive( message_block, received_message ) ) { wcout << L"receive success" << endl; wcout << L"received message: " << received_message << endl; } else { wcout << L"receive fail" << endl; } }
[ 코드5. try_receive() 의 수신할 message 가 있는 경우 예제 ]
try_receive() 는 비 동기 함수이기 때문에 수신할 message 가 있든 없든 먼저 반환됩니다. 수신할 message 가 있으면 true 를 반환하고 인자인 참조 변수에 수신한 message 를 저장합니다,
[ 그림3. try_receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]
- 수신할 message 가 없는 경우
#include <iostream> #include <string> #include <agents.h> using namespace std; using namespace Concurrency; int main() { // message block unbounded_buffer< wstring > message_block; // send( message_block, wstring( L"first message!" ) ); wstring received_message; if( try_receive( message_block, received_message ) ) { wcout << L"receive success" << endl; wcout << L"received message: " << received_message << endl; } else { wcout << L"receive fail" << endl; } }
[ 코드6. try_receive() 의 수신할 message 가 없는 경우 예제 ]
동기 함수인 receive() 와는 달리 비 동기 함수인 try_receive() 는 수신할 message 가 없을 경우, 기다리지 않고 false 를 반환합니다.
수신할 message 가 있든 없든 바로 반환해야 하는 경우라면 receive() 의 매개변수인 최대 대기 시간을 0 으로 지정하는 것보다는 try_receive() 를 권장합니다.
receive() 의 최대 대기 시간은 예외 메커니즘을 사용하므로 try_receive() 에 비해 오버헤드가 있을 수 있습니다.
[ 그림4. try_receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]
Message 필터
지난 글에서 message block 에 필터를 지정할 수 있다고 언급했습니다. message block 에 지정되는 필터는 message block 에 전송 시에 적용됩니다.
마찬가지로 수신 함수들에도 필터를 지정할 수 있습니다. message block 으로부터 수신 시에 적용됩니다.
동기 함수인 receive() 는 수신할 message 가 필터에 의해 수락될 때까지 대기합니다. 즉, 수신할 message 가 필터에 의해 거부된다면 message block 에 message 가 없을 때와 같습니다.
비 동기 함수인 try_receive() 또한 message block 에 message 가 없을 때와 마찬가지로 false 를 반환합니다.
다시 말해, message block 에 message 가 없는 경우에도 필터에 의해 거부된다는 말과 같습니다.
그럼, 필터를 이용한 수신 함수에 대한 예제를 살펴보겠습니다.
예제
#include <iostream> #include <vector> #include <iterator> #include <functional> #include <agents.h> #include <ppl.h> using namespace std; using namespace std::tr1; using namespace Concurrency; class number_collector : public agent { public: number_collector( ISource< int >& source, vector< int >& result, function< bool ( int ) > filter ) : source( source ) , result( result ) , filter( filter ) { } protected: void run() { while( true ) { int number = receive( this->source, this->filter ); if( 0 == number ) break; this->result.push_back( number ); } this->done(); } private: ISource< int >& source; vector< int >& result; function< bool ( int ) > filter; }; int main() { // message block unbounded_buffer< int > message_block; // send number 1 ~ 10. parallel_for( 1, 11, [&]( int number ) { send( message_block, number ); } ); // send stop signal. send( message_block, 0 ); // for even. send( message_block, 0 ); // for odd. vector< int > even_number_array, odd_number_array; number_collector even_number_collector( message_block, even_number_array, []( int number ) -> bool { return 0 == number % 2; } ); number_collector odd_number_collector( message_block, odd_number_array, []( int number ) -> bool { if( 0 == number ) return true; return 0 != number % 2; } ); even_number_collector.start(); odd_number_collector.start(); // wait for all agents. agent* number_collectors[2] = { &even_number_collector, &odd_number_collector }; agent::wait_for_all( 2, number_collectors ); // print wcout << L"odd numbers: "; copy( odd_number_array.begin(), odd_number_array.end(), ostream_iterator< int, wchar_t >( wcout, L" " ) ); wcout << endl << L"even numbers: "; copy( even_number_array.begin(), even_number_array.end(), ostream_iterator< int, wchar_t >( wcout, L" " ) ); wcout << endl; }
[ 코드7. 필터를 이용한 숫자 고르기 예제 ]
우선 message block 에 1 ~ 10 의 정수를 전송합니다. parallel_for() 를 사용하였는데 이 함수는 Concurrency Runtime 위에서 AAL 과 작동하는 돌아가는 Parallel Patterns Library( 이하, PPL ) 에서 제공하는 함수입니다. PPL 에 대한 자세항 사항은 visual studio 팀 블로그에서 확인하실 수 있습니다.
parallel_for() 는 반복될 내용을 병렬로 처리하기 때문에 성능에 도움을 줍니다. 그러나 반복되는 순서를 보장하지 않습니다.
그래서 1 ~ 10 의 정수가 전송되는 순서는 알 수 없습니다. 하지만 1 ~ 10 의 정수를 모두 전송한 뒤, 0을 보내서 마지막 message 라는 것을 알려주었습니다. 두 번 보낸 0 중 하나는 짝수를 수신하는 agent 를 위한 것이고, 하나는 홀수를 수신하는 agent 를 위한 것입니다.
사실, 이런 처리 로직을 구성할 때에는 상태 변화 알림에 유용한 다른 message block 을 사용하는 것이 좋지만 아직 message block 에 대해서 설명하지 않았기 때문에 혼란을 줄이기 위해 간단한 unbounded_buffer 하나만으로 처리하였습니다.
위 코드에 정의된 agent 인 number_collector 는 message block 으로부터 필터에 의해 필터링된 message 를 컨테이너에 저장합니다.
동기 함수인 receive() 를 사용했기 때문에 원하는 message 가 올 때까지 기다립니다. 이로 인해 필요한 만큼의 최소의 반복을 하여 오버헤드가 줄어 듭니다.
만약 비 동기 함수인 try_receive() 를 사용했다면 쓸모 없는 반복 오버헤드를 발생시킬 것입니다. 이 예제의 경우에는 동기 함수인 receive() 가 적합합니다.
정의된 agent 를 짝수용과 홀수용을 선언하고 start() 를 사용하여 작업을 시작합니다. 그리고 wait_for_all() 을 사용하여 두 agent 가 모두 끝날 때까지 기다린 후, 모든 작업이 종료되면 화면에 수집한 정수들을 출력합니다.
위 예제 코드는 Visual studio 2008 부터 지원하는 tr1 의 function 과 visual studio 2010 부터 지원하는 C++0x 의 람다를 사용하였습니다. Concurrency Runtime 은 tr1, C++0x 등의 visual studio 2010 의 새로운 feature 들을 사용하여 구현되었기 때문에 이것들에 대해 알아두는 것이 좋습니다.
[ 그림5. 필터를 이용한 숫자 고르기 예제 실행 결과 ]
마치는 글
이 글에서는 message 전달 함수 중 수신 함수인 receive() 와 try_receive() 에 대해서 알아보았습니다.
receive() 와 try_receive() 는 사용해야 할 상황이 분명히 다르니 상황에 따라 사용에 유의해야 합니다.
다음 글에서는 message 가 저장되는 message block 에 대해서 알아보도록 하겠습니다.
'VC++ 10 Concurrency Runtime' 카테고리의 다른 글
Asynchronous Agents Library – message block 2. ( unbounded_buffer ) (2) | 2010.07.18 |
---|---|
Asynchronous Agents Library – message block 1. ( 인터페이스 ) (0) | 2010.07.10 |
Asynchronous Agents Library - message 전달 함수. 1 ( 전송 ) (0) | 2010.06.26 |
Asynchronous Agents Library – agent. 2 ( 기능 ) (0) | 2010.06.13 |
Asynchronous Agents Library - agent. 1 ( 상태 ) (4) | 2010.06.05 |