Asynchronous Agents Library
– message block 9. ( custom )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 아마도 이 글이 Asynchronous Agents Library( 이하 AAL ) 에 관련된 마지막 글이라고 생각이 됩니다. 더 추가적으로 적을 내용이 생기면 더 적어 보도록 하겠습니다.

 이전 글까지 AAL 에서 제공하는 message block 들을 알아보았지만, 제공되는 것들이 마음에 들지 않는 분들은 직접 message block 을 만들어 사용할 수 있습니다.

 직접 message block 을 만들어 사용하는 것이 그리 간단하지만은 않기 때문에 이번 글이 좀 길어질 것 같습니다. 천천히 읽어주시기 바랍니다.

 

Message block 의 종류와 인터페이스

 우선 message block 정의에 앞서 message block 의 종류에 대해서 알아보는 것이 좋겠습니다. 이미 이전 글들에서 본 제공되는 message block 들을 보시면서 아셨겠지만, 한 번 더 짚고 넘어가도록 하겠습니다.

 Message 를 보내는 역할을 하는 message block 을 source block 이라 하고, message 를 받는 역할을 하는 역할을 하는 message block 을 target block 이라고 합니다.

 이 두 가지 종류의 block 들의 기본적인 행동들을 알리기 위해서 source block 들은 ISource 인터페이스를, target block 들은 ITarget 인터페이스를 상속받아 인터페이스 메소드들을 정의해야 합니다.

 

Message block 의 기본 클래스들

 위에서 언급한 ISource 인터페이스나 ITarget 인터페이스를 직접 상속받아 정의해도 되지만, AAL 에서는 그 인터페이스를 상속받아 정의하기 쉽도록 해주는 기본 클래스 3 가지를 제공합니다.

  • source_blockISource 인터페이스를 상속받고, 다른 block 에 message 를 보냅니다.
  • target_blockITarget 인터페이스를 상속받고, 다른 block 으로부터 message 를 받습니다.
  • propagator_blockISource 인터페이스와 ITarget 인터페이스를 상속받고, 다른 block 들과 message 를 보내고 받습니다.

 

 AAL 에서는 사용자 정의 message block 을 정의할 때, 인터페이스를 직접 상속받기 보다는 위의 기본 클래스들을 상속받아 정의하는 것을 권고하고 있습니다.

 

연결 관리 및 message 관리를 위한 템플릿 매개변수에 사용할 수 있는 클래스 템플릿

  위의 3가지 기본 클래스들은 클래스 템플릿으로, message block 들간의 연결을 어떻게 관리할 것인지, message 들을 어떻게 관리할 것인지에 대한 정보를 템플릿 매개변수를 통해 지정할 수 있습니다.

 AAL 은 연결 관리를 위한 2가지의  클래스 템플릿을 제공하고 있습니다.

  • single_link_registry – source 나 target 의 하나의 연결만 허용.
  • multi_link_registry – source 나 target 의 여럿의 연결을 허용.

 

 예를 들면, AAL 에서 제공하는 transformer 는 출력을 하나의 연결만 허용하는 single_link_registry 를 사용하고, 입력은 여럿의 연결을 허용하는 multi_link_registry 를 사용하고 있습니다.

template<class _Input, class _Output>
class transformer : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>

[ 코드1. transformer 에서 사용되는 single_link_registry 와 multi_link_registry ]

 

 또한 message 관리를 위한 하나의 클래스 템플릿을 제공합니다.

  • ordered_mesage_processor – message 를 받는 순서대로 처리하도록 허용.

 

재정의해야 할 멤버 함수들

 위에 말한 기본 클래스들을 상속한 클래스는 다음과 같은 멤버 함수를 재정의해야 합니다.

 

source_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  • propagate_to_any_targets()
  • accept_message()
  • reserve_message()
  • consume_message()
  • release_message()
  • resume_propagation()

 

propagate_to_any_targets()

 입력 message 나 출력 message 를 비 동기 또는 동기적으로 처리하기 위해 런타임으로부터 호출됩니다.

 

accept_message()

 Message 를 수락하기 위해 target block 으로부터 호출됩니다.

 

reserve_message(), consume_message(), release_message(), resume_propagation()

 이 멤버 함수들은 message 를 예약하는 기능을 제공합니다.

 target block 은 message 를 제공받을 때와 message 를 나중에 사용하기 위해 예약해야 할 때 reserve_message() 를 호출합니다.

 target block 은 하나의 message 를 예약한 후에, message 를 사용하기 위해 consume_message() 를 호출하거나 예약을 취소하기 위해 release_message() 를 호출할 수 있습니다.

 consume_message() 는 accept_message() 와 함께 사용되어 message 의 소유권을 보내거나 message 의 복사본을 보내도록 할 수 있습니다.

 예를 들어, unbounded_buffer 와 같은 message block 은 오직 하나의 target 에만 message 를 보냅니다. 즉, message 의 소유권을 target 에 보냅니다. 그렇기 때문에 보낸 후, message block 내에는 보낸 message 가 남아있지 않습니다.

 반면에 overwrite_buffer 와 같은 message block 은 각 연결된 target 에 각각 message 를 제공합니다. 즉, message 의 복사본을 보냅니다, 그래서 보낸 후, message block 내에 보낸 message 가 여전히 존재하게 됩니다.

 target block 이 예약된 message 를 사용하거나 취소한 후에, 런타임은 resume_propagation() 을 호출합니다. resume_propagation() 은 큐( queue )의 다음 message 를 시작으로 message 의 전달을 계속해서 진행시킵니다.

 

target_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  • propagate_message()
  • send_message() ( option )

 

propagate_message(), send_message()

 런타임은 다른 block 으로부터 현재의 block 으로 message 를 비 동기적으로 받기 위해 propagate_message() 를 호출합니다.

send_message() 는 비 동기적이 아니라 동기적으로 message 를 보내는 것을 제외하면 propagate_message() 와 비슷합니다.

 기본적으로 send_message() 의 정의는 모든 입력 message 를 거부하도록 구현되어 있습니다.

 만약 message 가 target block 에 설정된 필터 함수를 통과하지 못하면 send_message() 나 propagate_message() 를 호출하지 않습니다.

 

propagator_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  propagator_blocksource_blocktarget_block 의 특징을 모두 상속하므로 두 클래스에 대해 재정의해야 할 멤버 함수들을 모두 재정의해야 합니다.

 

예제: priority_buffer

 priority_buffer 는 우선 순위를 기준으로 순서를 정하는 사용자 정의 message block 입니다. 여럿의 message 들을 받을 때 우선 순위대로 처리할 때 사용하기 유용합니다.

 Message 큐를 가지고 있고, source 와 target 의 역할을 하며, 여럿의 source 와 여럿의 target 을 연결할 수 있기 때문에 unbounded_buffer 와 비슷합니다.

 그러나 unbounded_buffer 는 message 를 받는 순서를 기반으로 message 를 전달한다는 것이 다릅니다.

 priority_buffer 는 우선 순위와 데이터를 같이 전달해야 하므로 우선 순위 타입과 데이터의 타입을 포함하는 tuple 타입의 message 를 전달합니다.

 priority_buffer 는 2개의 message 큐를 관리합니다. 입력 message 를 위한 std::priority_queue 와 출력 message 를 위한 std::queue 입니다.

 priority_buffer 의 정의를 위해 propagator_block 을 상속하고 7개의 멤버 함수들을 정의해야하고, link_target_notification() 과 send_message() 를 재정의해야 합니다.

 또한 2개의 public 멤버 함수인 enqueue() 와 dequeue() 를 정의합니다. private 멤버 함수로 propagate_priority_order() 를 정의합니다.

 

코드

#include <agents.h>
#include <queue>

// 우선 순위를 비교할 비교 함수 객체.
namespace std
{
	template< class Type, class PriorityType >
	struct less< Concurrency::message< tuple< PriorityType, Type > >* >
	{
		typedef Concurrency::message< tuple< PriorityType, Type > >		MessageType;

		bool operator()( const MessageType* left, const MessageType* right ) const
		{
			// message 가 tuple 이므로 get() 을 사용.
			return ( get< 0 >( left->payload ) < get< 0 >( right->payload ) );
		}
	};

	template< class Type, class PriorityType >
	struct greater< Concurrency::message< tuple< PriorityType, Type > >* >
	{
		typedef Concurrency::message< tuple< PriorityType, Type > >		MessageType;

		bool operator()( const MessageType* left, const MessageType* right ) const
		{
			return ( get< 0 >( left->payload ) > get< 0 >( right->payload ) );
		}
	};
}

namespace Concurrency
{
	// Type - 데이터 타입, PriorityType - 우선 순위 타입, PredicatorType - 비교 함수 객체
	// source 와 target 역할을 하므로 propagator_block 을 상속받고, 다중 입력 연결과 다중 출력 연결을 허용하므로 모두 multi_link_registry 를 사용.
	template< class Type, typename PriorityType = int, typename PredicatorType = std::less< message< std::tuple< PriorityType, Type > >* > >
	class priority_buffer
		: public propagator_block< multi_link_registry< ITarget< Type > >, multi_link_registry< ISource< std::tuple< PriorityType, Type > > > >
	{
	public:
		~priority_buffer()
		{
			// 연결을 모두 해제하기 위해 propagator_block 의 remove_network_links() 를 사용.
			this->remove_network_links();
		}

		priority_buffer()
		{
			// source 와 target 을 초기화 하기 위해 propagator_block 의 initialize_source_and_target() 을 사용.
			this->initialize_source_and_target();
		}

		priority_buffer( filter_method const& filter )
		{
			this->intialize_source_and_target();

			// 필터 함수를 등록하기 위해 target_block 의 register_filter() 를 사용.
			this->register_filter( filter );/
		}

		priority_buffer( Scheduler& scheduler )
		{
			this->initialize_source_and_target( &scheduler );
		}

		priority_buffer( Scheduler& scheduler, filter_method const& filter )
		{
			this->initialize_source_and_target( &scheduler );
			this->register_filter( filter );
		}

		priority_buffer( ScheduleGroup& schedule_group )
		{
			this->initialize_source_and_target( nullptr, &schedule_group );
		}

		priority_buffer( ScheduleGroup& schedule_group, filter_method const& filter )
		{
			this->initialize_source_and_target( nullptr, &schedule_group );
			this->register_filter( filter );
		}

		// 공개 멤버 함수들.
		bool enqueue( Type const& item )
		{
			return Concurrency::asend< Type >( this, item );
		}

		Type dequeue()
		{
			return Concurrency::receive< Type >( this );
		}

	protected:
		// message 처리하기 위해 런타임으로 부터 호출됨. message 를 입력 큐에서 출력 큐로 이동하고, 전달.
		virtual void propagate_to_any_targets( message< _Target_type >* )
		{
			message< _Source_type >* input_message = nullptr;

			{
				critical_section::scoped_lock lock( this->input_lock );

				// 보낼 message 를 우선 순위 큐에서 꺼냄.
				if( this->input_messages.size() > 0 )
				{
					input_message = this->input_messages.top();
					this->input_messages.pop();
				}
			}

			if( nullptr != input_message )
			{
				// 입력된 message 는 우선 순위와 데이터를 같이 가지고 있으므로 데이터만 가진 message 로 가공하여 출력 큐에 넣음.
				message< _Target_type >* output_message = new message< _Target_type >( get< 1 >( input_message->payload ) );
				this->output_messages.push( output_message );

				delete input_message;

				if( this->output_messages.front()->msg_id() != output_message->msg_id() )
				{
					return;
				}
			}

			// message 보내기.
			this->propagate_priority_order();
		}

		// target block 이 message 를 받기 위해 호출함. 이 코드에서는 출력 큐에서 message 를 제거해서 소유권을 이전.
		virtual message< _Target_type >* accept_message( runtime_object_identity msg_id )
		{
			message< _Target_type >* message = nullptr;

			if( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id )
			{
				message = this->output_messages.front();
				this->output_messages.pop();
			}

			return message;
		}

		// target block 이 제공된 message 를 예약하기 위해 호출. 이 코드에서는 전달 가능 여부를 확인.
		virtual bool reserve_message( runtime_object_identity msg_id )
		{
			return ( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id );
		}

		// target block 이 제공된 message 를 사용하기 위해 호출. 이 코드에서는 message 전달.
		virtual message< Type >* consume_message( runtime_object_identity msg_id )
		{
			return this->accept_message( msg_id );
		}

		// target block 이 예약된 message 를 취소하기 위해 호출. 이 코드에서는 아무 역할을 하지 않음.
		virtual void release_message( runtime_object_identity msg_id )
		{
			if( this->output_messages.empty() || this->output_messages.front()->msg_id() != msg_id )
			{
				throw message_not_found();
			}
		}

		// 예약된 message 처리 후, 계속해서 진행.
		virtual void resume_propagation()
		{
			if( this->output_messages.size() > 0 )
				this->async_send( nullptr );
		}

		// 새로운 target 이 연결되었음을 알림.
		virtual void link_target_notification( ITarget< _Target_type >* )
		{
			// 이미 예약된 message 가 있으면 전달하지 않음.
			if( this->_M_pReservedFor != nullptr )
				return;

			// message 보내기.
			this->propagate_priority_order();
		}

		// 비 동기적으로 전달. propagator_block 의 propagate() 에 의해 호출됨.
		virtual message_status propagate_message( message< _Source_type >* message, ISource< _Source_type >* source )
		{
			message = source->accept( message->msg_id(), this );

			if( nullptr != message )
			{
				{
					critical_section::scoped_lock lock( this->input_lock );
					this->input_messages.push( message );
				}

				this->async_send( nullptr );

				return accepted;
			}else
				return missed;
		}

		// 동기적으로 전달. propagator_block 의 send() 에 의해 호출됨.
		virtual message_status send_message( message< _Source_type >* message, ISource< _Source_type >* source )
		{
			message = source->accept( message->msg_id(), this );

			if( nullptr != message )
			{
				{
					critical_section::scoped_lock lock( this->input_lock );
					this->input_messages.push( message );
				}

				this->sync_send( nullptr );

				return accepted;
			}else
				return missed;
		}

	private:
		// message 를 보내는 함수.
		void propagate_priority_order()
		{
			// 이미 예약된 message 가 있으면 전달하지 않음.
			if( nullptr != this->_M_pReservedFor )
				return;

			// 출력 큐의 모든 message 를 보냄.
			while( !this->output_messages.empty() )
			{
				message< _Target_type >* message = this->output_messages.front();

				message_status status = declined;

				// 연결된 target 을 순회하면서 message 를 전달.
				for( target_iterator iter = this->_M_connectedTargets.begin();
					nullptr != *iter;
					++iter )
				{
					ITarget< _Target_type >* target = *iter;
					status = target->propagate( message, this );

					if( accepted == status )
						break;

					if( nullptr != this->_M_pReservedFor )
						break;		 
				}

				if( accepted != status )
					break;
			}
		}

	private:
		std::priority_queue<
			message< _Source_type >*,
			std::vector< message< _Source_type >* >,
			PredicatorType >							input_messages;		
		std::queue< message< _Target_type >* >			output_messages;
		critical_section								input_lock;

	private:
		priority_buffer const& operator=( priority_buffer const& );
		priority_buffer( priority_buffer const&  );
	};
}

[ 코드2. priority_buffer 구현 ]

 재정의하는 함수들이 많고, 어디서 어떻게 호출되는지 알기 어려울 수 있습니다. 간단하게 설명하자면 외부로부터 보내는 함수, send() 또는 asend() 에 의해서 message 가 전달될 경우, target block 의 역할을 하게 됩니다. 이 때는 propagate_message() 나 send_message() 중 하나가 호출됩니다.

 반대로 외부로부터 받는 함수, receive() 또는 try_receive() 에 의해서 message 를 전달해야 하는 경우, source block 의 역할을 하게 되고, 나머지 함수들이 호출되게 됩니다.

 이 예제로 완벽하게는 아니더라도 어떻게 사용자 정의 message block 을 구현해야 하는지 아셨을 것이라 생각됩니다.

 

사용 코드

#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int main()
{
	priority_buffer< int > buffer;

	parallel_invoke(
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 1, 12 ) ); },
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 3, 36 ) ); },
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 2, 24 ) ); } );

	for( unsigned int i = 0; i < 75; ++i )
	{
		wcout << receive( buffer ) << L' ';
		if( ( i + 1 ) % 25 == 0 )
			wcout << endl;
	}
}

[ 코드3. priority_buffer 를 사용하는 예제 ]

 parallel_invoke() 를 사용하기 때문에 순서를 입력되는 순서를 보장할 수 없습니다. 하지만 receive() 를 사용하여 message 를 출력해 보면 우선 순위가 큰 순서대로 출력되는 것을 알 수 있습니다.


[ 그림1. priority_buffer 를 사용하는 예제 결과 ]

[ 그림1. priority_buffer 를 사용하는 예제 결과 ]


 

마치는 글

 사용자 정의 message block 을 구현하는 것을 마지막으로 AAL 에 관련된 글을 마무리합니다. AAL 에 관련해 새로운 소식이 있으면 그 때 다시 AAL 에 관련된 글을 작성하도록 하겠습니다.

 다음 글은 AAL 을 포함하는 Concurrency Runtime 에서 제공하는 작업 스케줄러에 대해서 작성해 볼 예정입니다. 작업 스케줄러는 AAL 의 message block 과 함께 유용하게 사용할 수 있으므로 AAL 에 관심이 많으신 분들도 기대하셔도 좋을 것 같습니다.