Asynchronous Agents Library – message block 9. ( custom )
VC++ 10 Concurrency Runtime 2010. 8. 24. 08:30Asynchronous Agents Library
– message block 9. ( custom )
작성자: 임준환( mumbi at daum dot net )
시작하는 글
아마도 이 글이 Asynchronous Agents Library( 이하 AAL ) 에 관련된 마지막 글이라고 생각이 됩니다. 더 추가적으로 적을 내용이 생기면 더 적어 보도록 하겠습니다.
이전 글까지 AAL 에서 제공하는 message block 들을 알아보았지만, 제공되는 것들이 마음에 들지 않는 분들은 직접 message block 을 만들어 사용할 수 있습니다.
직접 message block 을 만들어 사용하는 것이 그리 간단하지만은 않기 때문에 이번 글이 좀 길어질 것 같습니다. 천천히 읽어주시기 바랍니다.
Message block 의 종류와 인터페이스
우선 message block 정의에 앞서 message block 의 종류에 대해서 알아보는 것이 좋겠습니다. 이미 이전 글들에서 본 제공되는 message block 들을 보시면서 아셨겠지만, 한 번 더 짚고 넘어가도록 하겠습니다.
Message 를 보내는 역할을 하는 message block 을 source block 이라 하고, message 를 받는 역할을 하는 역할을 하는 message block 을 target block 이라고 합니다.
이 두 가지 종류의 block 들의 기본적인 행동들을 알리기 위해서 source block 들은 ISource 인터페이스를, target block 들은 ITarget 인터페이스를 상속받아 인터페이스 메소드들을 정의해야 합니다.
Message block 의 기본 클래스들
위에서 언급한 ISource 인터페이스나 ITarget 인터페이스를 직접 상속받아 정의해도 되지만, AAL 에서는 그 인터페이스를 상속받아 정의하기 쉽도록 해주는 기본 클래스 3 가지를 제공합니다.
- source_block – ISource 인터페이스를 상속받고, 다른 block 에 message 를 보냅니다.
- target_block – ITarget 인터페이스를 상속받고, 다른 block 으로부터 message 를 받습니다.
- propagator_block – ISource 인터페이스와 ITarget 인터페이스를 상속받고, 다른 block 들과 message 를 보내고 받습니다.
AAL 에서는 사용자 정의 message block 을 정의할 때, 인터페이스를 직접 상속받기 보다는 위의 기본 클래스들을 상속받아 정의하는 것을 권고하고 있습니다.
연결 관리 및 message 관리를 위한 템플릿 매개변수에 사용할 수 있는 클래스 템플릿
위의 3가지 기본 클래스들은 클래스 템플릿으로, message block 들간의 연결을 어떻게 관리할 것인지, message 들을 어떻게 관리할 것인지에 대한 정보를 템플릿 매개변수를 통해 지정할 수 있습니다.
AAL 은 연결 관리를 위한 2가지의 클래스 템플릿을 제공하고 있습니다.
- single_link_registry – source 나 target 의 하나의 연결만 허용.
- multi_link_registry – source 나 target 의 여럿의 연결을 허용.
예를 들면, AAL 에서 제공하는 transformer 는 출력을 하나의 연결만 허용하는 single_link_registry 를 사용하고, 입력은 여럿의 연결을 허용하는 multi_link_registry 를 사용하고 있습니다.
template<class _Input, class _Output> class transformer : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
[ 코드1. transformer 에서 사용되는 single_link_registry 와 multi_link_registry ]
또한 message 관리를 위한 하나의 클래스 템플릿을 제공합니다.
- ordered_mesage_processor – message 를 받는 순서대로 처리하도록 허용.
재정의해야 할 멤버 함수들
위에 말한 기본 클래스들을 상속한 클래스는 다음과 같은 멤버 함수를 재정의해야 합니다.
source_block 을 상속한 클래스가 재정의해야 할 멤버 함수들
- propagate_to_any_targets()
- accept_message()
- reserve_message()
- consume_message()
- release_message()
- resume_propagation()
propagate_to_any_targets()
입력 message 나 출력 message 를 비 동기 또는 동기적으로 처리하기 위해 런타임으로부터 호출됩니다.
accept_message()
Message 를 수락하기 위해 target block 으로부터 호출됩니다.
reserve_message(), consume_message(), release_message(), resume_propagation()
이 멤버 함수들은 message 를 예약하는 기능을 제공합니다.
target block 은 message 를 제공받을 때와 message 를 나중에 사용하기 위해 예약해야 할 때 reserve_message() 를 호출합니다.
target block 은 하나의 message 를 예약한 후에, message 를 사용하기 위해 consume_message() 를 호출하거나 예약을 취소하기 위해 release_message() 를 호출할 수 있습니다.
consume_message() 는 accept_message() 와 함께 사용되어 message 의 소유권을 보내거나 message 의 복사본을 보내도록 할 수 있습니다.
예를 들어, unbounded_buffer 와 같은 message block 은 오직 하나의 target 에만 message 를 보냅니다. 즉, message 의 소유권을 target 에 보냅니다. 그렇기 때문에 보낸 후, message block 내에는 보낸 message 가 남아있지 않습니다.
반면에 overwrite_buffer 와 같은 message block 은 각 연결된 target 에 각각 message 를 제공합니다. 즉, message 의 복사본을 보냅니다, 그래서 보낸 후, message block 내에 보낸 message 가 여전히 존재하게 됩니다.
target block 이 예약된 message 를 사용하거나 취소한 후에, 런타임은 resume_propagation() 을 호출합니다. resume_propagation() 은 큐( queue )의 다음 message 를 시작으로 message 의 전달을 계속해서 진행시킵니다.
target_block 을 상속한 클래스가 재정의해야 할 멤버 함수들
- propagate_message()
- send_message() ( option )
propagate_message(), send_message()
런타임은 다른 block 으로부터 현재의 block 으로 message 를 비 동기적으로 받기 위해 propagate_message() 를 호출합니다.
send_message() 는 비 동기적이 아니라 동기적으로 message 를 보내는 것을 제외하면 propagate_message() 와 비슷합니다.
기본적으로 send_message() 의 정의는 모든 입력 message 를 거부하도록 구현되어 있습니다.
만약 message 가 target block 에 설정된 필터 함수를 통과하지 못하면 send_message() 나 propagate_message() 를 호출하지 않습니다.
propagator_block 을 상속한 클래스가 재정의해야 할 멤버 함수들
propagator_block 은 source_block 과 target_block 의 특징을 모두 상속하므로 두 클래스에 대해 재정의해야 할 멤버 함수들을 모두 재정의해야 합니다.
예제: priority_buffer
priority_buffer 는 우선 순위를 기준으로 순서를 정하는 사용자 정의 message block 입니다. 여럿의 message 들을 받을 때 우선 순위대로 처리할 때 사용하기 유용합니다.
Message 큐를 가지고 있고, source 와 target 의 역할을 하며, 여럿의 source 와 여럿의 target 을 연결할 수 있기 때문에 unbounded_buffer 와 비슷합니다.
그러나 unbounded_buffer 는 message 를 받는 순서를 기반으로 message 를 전달한다는 것이 다릅니다.
priority_buffer 는 우선 순위와 데이터를 같이 전달해야 하므로 우선 순위 타입과 데이터의 타입을 포함하는 tuple 타입의 message 를 전달합니다.
priority_buffer 는 2개의 message 큐를 관리합니다. 입력 message 를 위한 std::priority_queue 와 출력 message 를 위한 std::queue 입니다.
priority_buffer 의 정의를 위해 propagator_block 을 상속하고 7개의 멤버 함수들을 정의해야하고, link_target_notification() 과 send_message() 를 재정의해야 합니다.
또한 2개의 public 멤버 함수인 enqueue() 와 dequeue() 를 정의합니다. private 멤버 함수로 propagate_priority_order() 를 정의합니다.
코드
#include <agents.h> #include <queue> // 우선 순위를 비교할 비교 함수 객체. namespace std { template< class Type, class PriorityType > struct less< Concurrency::message< tuple< PriorityType, Type > >* > { typedef Concurrency::message< tuple< PriorityType, Type > > MessageType; bool operator()( const MessageType* left, const MessageType* right ) const { // message 가 tuple 이므로 get() 을 사용. return ( get< 0 >( left->payload ) < get< 0 >( right->payload ) ); } }; template< class Type, class PriorityType > struct greater< Concurrency::message< tuple< PriorityType, Type > >* > { typedef Concurrency::message< tuple< PriorityType, Type > > MessageType; bool operator()( const MessageType* left, const MessageType* right ) const { return ( get< 0 >( left->payload ) > get< 0 >( right->payload ) ); } }; } namespace Concurrency { // Type - 데이터 타입, PriorityType - 우선 순위 타입, PredicatorType - 비교 함수 객체 // source 와 target 역할을 하므로 propagator_block 을 상속받고, 다중 입력 연결과 다중 출력 연결을 허용하므로 모두 multi_link_registry 를 사용. template< class Type, typename PriorityType = int, typename PredicatorType = std::less< message< std::tuple< PriorityType, Type > >* > > class priority_buffer : public propagator_block< multi_link_registry< ITarget< Type > >, multi_link_registry< ISource< std::tuple< PriorityType, Type > > > > { public: ~priority_buffer() { // 연결을 모두 해제하기 위해 propagator_block 의 remove_network_links() 를 사용. this->remove_network_links(); } priority_buffer() { // source 와 target 을 초기화 하기 위해 propagator_block 의 initialize_source_and_target() 을 사용. this->initialize_source_and_target(); } priority_buffer( filter_method const& filter ) { this->intialize_source_and_target(); // 필터 함수를 등록하기 위해 target_block 의 register_filter() 를 사용. this->register_filter( filter );/ } priority_buffer( Scheduler& scheduler ) { this->initialize_source_and_target( &scheduler ); } priority_buffer( Scheduler& scheduler, filter_method const& filter ) { this->initialize_source_and_target( &scheduler ); this->register_filter( filter ); } priority_buffer( ScheduleGroup& schedule_group ) { this->initialize_source_and_target( nullptr, &schedule_group ); } priority_buffer( ScheduleGroup& schedule_group, filter_method const& filter ) { this->initialize_source_and_target( nullptr, &schedule_group ); this->register_filter( filter ); } // 공개 멤버 함수들. bool enqueue( Type const& item ) { return Concurrency::asend< Type >( this, item ); } Type dequeue() { return Concurrency::receive< Type >( this ); } protected: // message 처리하기 위해 런타임으로 부터 호출됨. message 를 입력 큐에서 출력 큐로 이동하고, 전달. virtual void propagate_to_any_targets( message< _Target_type >* ) { message< _Source_type >* input_message = nullptr; { critical_section::scoped_lock lock( this->input_lock ); // 보낼 message 를 우선 순위 큐에서 꺼냄. if( this->input_messages.size() > 0 ) { input_message = this->input_messages.top(); this->input_messages.pop(); } } if( nullptr != input_message ) { // 입력된 message 는 우선 순위와 데이터를 같이 가지고 있으므로 데이터만 가진 message 로 가공하여 출력 큐에 넣음. message< _Target_type >* output_message = new message< _Target_type >( get< 1 >( input_message->payload ) ); this->output_messages.push( output_message ); delete input_message; if( this->output_messages.front()->msg_id() != output_message->msg_id() ) { return; } } // message 보내기. this->propagate_priority_order(); } // target block 이 message 를 받기 위해 호출함. 이 코드에서는 출력 큐에서 message 를 제거해서 소유권을 이전. virtual message< _Target_type >* accept_message( runtime_object_identity msg_id ) { message< _Target_type >* message = nullptr; if( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id ) { message = this->output_messages.front(); this->output_messages.pop(); } return message; } // target block 이 제공된 message 를 예약하기 위해 호출. 이 코드에서는 전달 가능 여부를 확인. virtual bool reserve_message( runtime_object_identity msg_id ) { return ( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id ); } // target block 이 제공된 message 를 사용하기 위해 호출. 이 코드에서는 message 전달. virtual message< Type >* consume_message( runtime_object_identity msg_id ) { return this->accept_message( msg_id ); } // target block 이 예약된 message 를 취소하기 위해 호출. 이 코드에서는 아무 역할을 하지 않음. virtual void release_message( runtime_object_identity msg_id ) { if( this->output_messages.empty() || this->output_messages.front()->msg_id() != msg_id ) { throw message_not_found(); } } // 예약된 message 처리 후, 계속해서 진행. virtual void resume_propagation() { if( this->output_messages.size() > 0 ) this->async_send( nullptr ); } // 새로운 target 이 연결되었음을 알림. virtual void link_target_notification( ITarget< _Target_type >* ) { // 이미 예약된 message 가 있으면 전달하지 않음. if( this->_M_pReservedFor != nullptr ) return; // message 보내기. this->propagate_priority_order(); } // 비 동기적으로 전달. propagator_block 의 propagate() 에 의해 호출됨. virtual message_status propagate_message( message< _Source_type >* message, ISource< _Source_type >* source ) { message = source->accept( message->msg_id(), this ); if( nullptr != message ) { { critical_section::scoped_lock lock( this->input_lock ); this->input_messages.push( message ); } this->async_send( nullptr ); return accepted; }else return missed; } // 동기적으로 전달. propagator_block 의 send() 에 의해 호출됨. virtual message_status send_message( message< _Source_type >* message, ISource< _Source_type >* source ) { message = source->accept( message->msg_id(), this ); if( nullptr != message ) { { critical_section::scoped_lock lock( this->input_lock ); this->input_messages.push( message ); } this->sync_send( nullptr ); return accepted; }else return missed; } private: // message 를 보내는 함수. void propagate_priority_order() { // 이미 예약된 message 가 있으면 전달하지 않음. if( nullptr != this->_M_pReservedFor ) return; // 출력 큐의 모든 message 를 보냄. while( !this->output_messages.empty() ) { message< _Target_type >* message = this->output_messages.front(); message_status status = declined; // 연결된 target 을 순회하면서 message 를 전달. for( target_iterator iter = this->_M_connectedTargets.begin(); nullptr != *iter; ++iter ) { ITarget< _Target_type >* target = *iter; status = target->propagate( message, this ); if( accepted == status ) break; if( nullptr != this->_M_pReservedFor ) break; } if( accepted != status ) break; } } private: std::priority_queue< message< _Source_type >*, std::vector< message< _Source_type >* >, PredicatorType > input_messages; std::queue< message< _Target_type >* > output_messages; critical_section input_lock; private: priority_buffer const& operator=( priority_buffer const& ); priority_buffer( priority_buffer const& ); }; }
[ 코드2. priority_buffer 구현 ]
재정의하는 함수들이 많고, 어디서 어떻게 호출되는지 알기 어려울 수 있습니다. 간단하게 설명하자면 외부로부터 보내는 함수, send() 또는 asend() 에 의해서 message 가 전달될 경우, target block 의 역할을 하게 됩니다. 이 때는 propagate_message() 나 send_message() 중 하나가 호출됩니다.
반대로 외부로부터 받는 함수, receive() 또는 try_receive() 에 의해서 message 를 전달해야 하는 경우, source block 의 역할을 하게 되고, 나머지 함수들이 호출되게 됩니다.
이 예제로 완벽하게는 아니더라도 어떻게 사용자 정의 message block 을 구현해야 하는지 아셨을 것이라 생각됩니다.
사용 코드
#include <ppl.h> #include <iostream> #include "priority_buffer.h" using namespace Concurrency; using namespace std; int main() { priority_buffer< int > buffer; parallel_invoke( [ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 1, 12 ) ); }, [ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 3, 36 ) ); }, [ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 2, 24 ) ); } ); for( unsigned int i = 0; i < 75; ++i ) { wcout << receive( buffer ) << L' '; if( ( i + 1 ) % 25 == 0 ) wcout << endl; } }
[ 코드3. priority_buffer 를 사용하는 예제 ]
parallel_invoke() 를 사용하기 때문에 순서를 입력되는 순서를 보장할 수 없습니다. 하지만 receive() 를 사용하여 message 를 출력해 보면 우선 순위가 큰 순서대로 출력되는 것을 알 수 있습니다.
[ 그림1. priority_buffer 를 사용하는 예제 결과 ]
마치는 글
사용자 정의 message block 을 구현하는 것을 마지막으로 AAL 에 관련된 글을 마무리합니다. AAL 에 관련해 새로운 소식이 있으면 그 때 다시 AAL 에 관련된 글을 작성하도록 하겠습니다.
다음 글은 AAL 을 포함하는 Concurrency Runtime 에서 제공하는 작업 스케줄러에 대해서 작성해 볼 예정입니다. 작업 스케줄러는 AAL 의 message block 과 함께 유용하게 사용할 수 있으므로 AAL 에 관심이 많으신 분들도 기대하셔도 좋을 것 같습니다.
'VC++ 10 Concurrency Runtime' 카테고리의 다른 글
Concurrency Runtime – Task Scheduler 1. ( Scheduler ) (0) | 2010.09.02 |
---|---|
Concurrency Runtime - 만델브로트 프랙탈 ( Mandelbrot Fractal ) 예제 (3) | 2010.08.28 |
Concurrency Runtime – 동기화 객체 2. ( event ) (0) | 2010.08.18 |
Concurrency Runtime – 동기화 객체 1. ( critical_section & reader_writer_lock ) (0) | 2010.08.15 |
Asynchronous Agents Library – message block 8. ( timer ) (0) | 2010.08.12 |