Asynchronous Agents Library
– message block 9. ( custom )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 아마도 이 글이 Asynchronous Agents Library( 이하 AAL ) 에 관련된 마지막 글이라고 생각이 됩니다. 더 추가적으로 적을 내용이 생기면 더 적어 보도록 하겠습니다.

 이전 글까지 AAL 에서 제공하는 message block 들을 알아보았지만, 제공되는 것들이 마음에 들지 않는 분들은 직접 message block 을 만들어 사용할 수 있습니다.

 직접 message block 을 만들어 사용하는 것이 그리 간단하지만은 않기 때문에 이번 글이 좀 길어질 것 같습니다. 천천히 읽어주시기 바랍니다.

 

Message block 의 종류와 인터페이스

 우선 message block 정의에 앞서 message block 의 종류에 대해서 알아보는 것이 좋겠습니다. 이미 이전 글들에서 본 제공되는 message block 들을 보시면서 아셨겠지만, 한 번 더 짚고 넘어가도록 하겠습니다.

 Message 를 보내는 역할을 하는 message block 을 source block 이라 하고, message 를 받는 역할을 하는 역할을 하는 message block 을 target block 이라고 합니다.

 이 두 가지 종류의 block 들의 기본적인 행동들을 알리기 위해서 source block 들은 ISource 인터페이스를, target block 들은 ITarget 인터페이스를 상속받아 인터페이스 메소드들을 정의해야 합니다.

 

Message block 의 기본 클래스들

 위에서 언급한 ISource 인터페이스나 ITarget 인터페이스를 직접 상속받아 정의해도 되지만, AAL 에서는 그 인터페이스를 상속받아 정의하기 쉽도록 해주는 기본 클래스 3 가지를 제공합니다.

  • source_blockISource 인터페이스를 상속받고, 다른 block 에 message 를 보냅니다.
  • target_blockITarget 인터페이스를 상속받고, 다른 block 으로부터 message 를 받습니다.
  • propagator_blockISource 인터페이스와 ITarget 인터페이스를 상속받고, 다른 block 들과 message 를 보내고 받습니다.

 

 AAL 에서는 사용자 정의 message block 을 정의할 때, 인터페이스를 직접 상속받기 보다는 위의 기본 클래스들을 상속받아 정의하는 것을 권고하고 있습니다.

 

연결 관리 및 message 관리를 위한 템플릿 매개변수에 사용할 수 있는 클래스 템플릿

  위의 3가지 기본 클래스들은 클래스 템플릿으로, message block 들간의 연결을 어떻게 관리할 것인지, message 들을 어떻게 관리할 것인지에 대한 정보를 템플릿 매개변수를 통해 지정할 수 있습니다.

 AAL 은 연결 관리를 위한 2가지의  클래스 템플릿을 제공하고 있습니다.

  • single_link_registry – source 나 target 의 하나의 연결만 허용.
  • multi_link_registry – source 나 target 의 여럿의 연결을 허용.

 

 예를 들면, AAL 에서 제공하는 transformer 는 출력을 하나의 연결만 허용하는 single_link_registry 를 사용하고, 입력은 여럿의 연결을 허용하는 multi_link_registry 를 사용하고 있습니다.

template<class _Input, class _Output>
class transformer : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>

[ 코드1. transformer 에서 사용되는 single_link_registry 와 multi_link_registry ]

 

 또한 message 관리를 위한 하나의 클래스 템플릿을 제공합니다.

  • ordered_mesage_processor – message 를 받는 순서대로 처리하도록 허용.

 

재정의해야 할 멤버 함수들

 위에 말한 기본 클래스들을 상속한 클래스는 다음과 같은 멤버 함수를 재정의해야 합니다.

 

source_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  • propagate_to_any_targets()
  • accept_message()
  • reserve_message()
  • consume_message()
  • release_message()
  • resume_propagation()

 

propagate_to_any_targets()

 입력 message 나 출력 message 를 비 동기 또는 동기적으로 처리하기 위해 런타임으로부터 호출됩니다.

 

accept_message()

 Message 를 수락하기 위해 target block 으로부터 호출됩니다.

 

reserve_message(), consume_message(), release_message(), resume_propagation()

 이 멤버 함수들은 message 를 예약하는 기능을 제공합니다.

 target block 은 message 를 제공받을 때와 message 를 나중에 사용하기 위해 예약해야 할 때 reserve_message() 를 호출합니다.

 target block 은 하나의 message 를 예약한 후에, message 를 사용하기 위해 consume_message() 를 호출하거나 예약을 취소하기 위해 release_message() 를 호출할 수 있습니다.

 consume_message() 는 accept_message() 와 함께 사용되어 message 의 소유권을 보내거나 message 의 복사본을 보내도록 할 수 있습니다.

 예를 들어, unbounded_buffer 와 같은 message block 은 오직 하나의 target 에만 message 를 보냅니다. 즉, message 의 소유권을 target 에 보냅니다. 그렇기 때문에 보낸 후, message block 내에는 보낸 message 가 남아있지 않습니다.

 반면에 overwrite_buffer 와 같은 message block 은 각 연결된 target 에 각각 message 를 제공합니다. 즉, message 의 복사본을 보냅니다, 그래서 보낸 후, message block 내에 보낸 message 가 여전히 존재하게 됩니다.

 target block 이 예약된 message 를 사용하거나 취소한 후에, 런타임은 resume_propagation() 을 호출합니다. resume_propagation() 은 큐( queue )의 다음 message 를 시작으로 message 의 전달을 계속해서 진행시킵니다.

 

target_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  • propagate_message()
  • send_message() ( option )

 

propagate_message(), send_message()

 런타임은 다른 block 으로부터 현재의 block 으로 message 를 비 동기적으로 받기 위해 propagate_message() 를 호출합니다.

send_message() 는 비 동기적이 아니라 동기적으로 message 를 보내는 것을 제외하면 propagate_message() 와 비슷합니다.

 기본적으로 send_message() 의 정의는 모든 입력 message 를 거부하도록 구현되어 있습니다.

 만약 message 가 target block 에 설정된 필터 함수를 통과하지 못하면 send_message() 나 propagate_message() 를 호출하지 않습니다.

 

propagator_block 을 상속한 클래스가 재정의해야 할 멤버 함수들

  propagator_blocksource_blocktarget_block 의 특징을 모두 상속하므로 두 클래스에 대해 재정의해야 할 멤버 함수들을 모두 재정의해야 합니다.

 

예제: priority_buffer

 priority_buffer 는 우선 순위를 기준으로 순서를 정하는 사용자 정의 message block 입니다. 여럿의 message 들을 받을 때 우선 순위대로 처리할 때 사용하기 유용합니다.

 Message 큐를 가지고 있고, source 와 target 의 역할을 하며, 여럿의 source 와 여럿의 target 을 연결할 수 있기 때문에 unbounded_buffer 와 비슷합니다.

 그러나 unbounded_buffer 는 message 를 받는 순서를 기반으로 message 를 전달한다는 것이 다릅니다.

 priority_buffer 는 우선 순위와 데이터를 같이 전달해야 하므로 우선 순위 타입과 데이터의 타입을 포함하는 tuple 타입의 message 를 전달합니다.

 priority_buffer 는 2개의 message 큐를 관리합니다. 입력 message 를 위한 std::priority_queue 와 출력 message 를 위한 std::queue 입니다.

 priority_buffer 의 정의를 위해 propagator_block 을 상속하고 7개의 멤버 함수들을 정의해야하고, link_target_notification() 과 send_message() 를 재정의해야 합니다.

 또한 2개의 public 멤버 함수인 enqueue() 와 dequeue() 를 정의합니다. private 멤버 함수로 propagate_priority_order() 를 정의합니다.

 

코드

#include <agents.h>
#include <queue>

// 우선 순위를 비교할 비교 함수 객체.
namespace std
{
	template< class Type, class PriorityType >
	struct less< Concurrency::message< tuple< PriorityType, Type > >* >
	{
		typedef Concurrency::message< tuple< PriorityType, Type > >		MessageType;

		bool operator()( const MessageType* left, const MessageType* right ) const
		{
			// message 가 tuple 이므로 get() 을 사용.
			return ( get< 0 >( left->payload ) < get< 0 >( right->payload ) );
		}
	};

	template< class Type, class PriorityType >
	struct greater< Concurrency::message< tuple< PriorityType, Type > >* >
	{
		typedef Concurrency::message< tuple< PriorityType, Type > >		MessageType;

		bool operator()( const MessageType* left, const MessageType* right ) const
		{
			return ( get< 0 >( left->payload ) > get< 0 >( right->payload ) );
		}
	};
}

namespace Concurrency
{
	// Type - 데이터 타입, PriorityType - 우선 순위 타입, PredicatorType - 비교 함수 객체
	// source 와 target 역할을 하므로 propagator_block 을 상속받고, 다중 입력 연결과 다중 출력 연결을 허용하므로 모두 multi_link_registry 를 사용.
	template< class Type, typename PriorityType = int, typename PredicatorType = std::less< message< std::tuple< PriorityType, Type > >* > >
	class priority_buffer
		: public propagator_block< multi_link_registry< ITarget< Type > >, multi_link_registry< ISource< std::tuple< PriorityType, Type > > > >
	{
	public:
		~priority_buffer()
		{
			// 연결을 모두 해제하기 위해 propagator_block 의 remove_network_links() 를 사용.
			this->remove_network_links();
		}

		priority_buffer()
		{
			// source 와 target 을 초기화 하기 위해 propagator_block 의 initialize_source_and_target() 을 사용.
			this->initialize_source_and_target();
		}

		priority_buffer( filter_method const& filter )
		{
			this->intialize_source_and_target();

			// 필터 함수를 등록하기 위해 target_block 의 register_filter() 를 사용.
			this->register_filter( filter );/
		}

		priority_buffer( Scheduler& scheduler )
		{
			this->initialize_source_and_target( &scheduler );
		}

		priority_buffer( Scheduler& scheduler, filter_method const& filter )
		{
			this->initialize_source_and_target( &scheduler );
			this->register_filter( filter );
		}

		priority_buffer( ScheduleGroup& schedule_group )
		{
			this->initialize_source_and_target( nullptr, &schedule_group );
		}

		priority_buffer( ScheduleGroup& schedule_group, filter_method const& filter )
		{
			this->initialize_source_and_target( nullptr, &schedule_group );
			this->register_filter( filter );
		}

		// 공개 멤버 함수들.
		bool enqueue( Type const& item )
		{
			return Concurrency::asend< Type >( this, item );
		}

		Type dequeue()
		{
			return Concurrency::receive< Type >( this );
		}

	protected:
		// message 처리하기 위해 런타임으로 부터 호출됨. message 를 입력 큐에서 출력 큐로 이동하고, 전달.
		virtual void propagate_to_any_targets( message< _Target_type >* )
		{
			message< _Source_type >* input_message = nullptr;

			{
				critical_section::scoped_lock lock( this->input_lock );

				// 보낼 message 를 우선 순위 큐에서 꺼냄.
				if( this->input_messages.size() > 0 )
				{
					input_message = this->input_messages.top();
					this->input_messages.pop();
				}
			}

			if( nullptr != input_message )
			{
				// 입력된 message 는 우선 순위와 데이터를 같이 가지고 있으므로 데이터만 가진 message 로 가공하여 출력 큐에 넣음.
				message< _Target_type >* output_message = new message< _Target_type >( get< 1 >( input_message->payload ) );
				this->output_messages.push( output_message );

				delete input_message;

				if( this->output_messages.front()->msg_id() != output_message->msg_id() )
				{
					return;
				}
			}

			// message 보내기.
			this->propagate_priority_order();
		}

		// target block 이 message 를 받기 위해 호출함. 이 코드에서는 출력 큐에서 message 를 제거해서 소유권을 이전.
		virtual message< _Target_type >* accept_message( runtime_object_identity msg_id )
		{
			message< _Target_type >* message = nullptr;

			if( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id )
			{
				message = this->output_messages.front();
				this->output_messages.pop();
			}

			return message;
		}

		// target block 이 제공된 message 를 예약하기 위해 호출. 이 코드에서는 전달 가능 여부를 확인.
		virtual bool reserve_message( runtime_object_identity msg_id )
		{
			return ( !this->output_messages.empty() && this->output_messages.front()->msg_id() == msg_id );
		}

		// target block 이 제공된 message 를 사용하기 위해 호출. 이 코드에서는 message 전달.
		virtual message< Type >* consume_message( runtime_object_identity msg_id )
		{
			return this->accept_message( msg_id );
		}

		// target block 이 예약된 message 를 취소하기 위해 호출. 이 코드에서는 아무 역할을 하지 않음.
		virtual void release_message( runtime_object_identity msg_id )
		{
			if( this->output_messages.empty() || this->output_messages.front()->msg_id() != msg_id )
			{
				throw message_not_found();
			}
		}

		// 예약된 message 처리 후, 계속해서 진행.
		virtual void resume_propagation()
		{
			if( this->output_messages.size() > 0 )
				this->async_send( nullptr );
		}

		// 새로운 target 이 연결되었음을 알림.
		virtual void link_target_notification( ITarget< _Target_type >* )
		{
			// 이미 예약된 message 가 있으면 전달하지 않음.
			if( this->_M_pReservedFor != nullptr )
				return;

			// message 보내기.
			this->propagate_priority_order();
		}

		// 비 동기적으로 전달. propagator_block 의 propagate() 에 의해 호출됨.
		virtual message_status propagate_message( message< _Source_type >* message, ISource< _Source_type >* source )
		{
			message = source->accept( message->msg_id(), this );

			if( nullptr != message )
			{
				{
					critical_section::scoped_lock lock( this->input_lock );
					this->input_messages.push( message );
				}

				this->async_send( nullptr );

				return accepted;
			}else
				return missed;
		}

		// 동기적으로 전달. propagator_block 의 send() 에 의해 호출됨.
		virtual message_status send_message( message< _Source_type >* message, ISource< _Source_type >* source )
		{
			message = source->accept( message->msg_id(), this );

			if( nullptr != message )
			{
				{
					critical_section::scoped_lock lock( this->input_lock );
					this->input_messages.push( message );
				}

				this->sync_send( nullptr );

				return accepted;
			}else
				return missed;
		}

	private:
		// message 를 보내는 함수.
		void propagate_priority_order()
		{
			// 이미 예약된 message 가 있으면 전달하지 않음.
			if( nullptr != this->_M_pReservedFor )
				return;

			// 출력 큐의 모든 message 를 보냄.
			while( !this->output_messages.empty() )
			{
				message< _Target_type >* message = this->output_messages.front();

				message_status status = declined;

				// 연결된 target 을 순회하면서 message 를 전달.
				for( target_iterator iter = this->_M_connectedTargets.begin();
					nullptr != *iter;
					++iter )
				{
					ITarget< _Target_type >* target = *iter;
					status = target->propagate( message, this );

					if( accepted == status )
						break;

					if( nullptr != this->_M_pReservedFor )
						break;		 
				}

				if( accepted != status )
					break;
			}
		}

	private:
		std::priority_queue<
			message< _Source_type >*,
			std::vector< message< _Source_type >* >,
			PredicatorType >							input_messages;		
		std::queue< message< _Target_type >* >			output_messages;
		critical_section								input_lock;

	private:
		priority_buffer const& operator=( priority_buffer const& );
		priority_buffer( priority_buffer const&  );
	};
}

[ 코드2. priority_buffer 구현 ]

 재정의하는 함수들이 많고, 어디서 어떻게 호출되는지 알기 어려울 수 있습니다. 간단하게 설명하자면 외부로부터 보내는 함수, send() 또는 asend() 에 의해서 message 가 전달될 경우, target block 의 역할을 하게 됩니다. 이 때는 propagate_message() 나 send_message() 중 하나가 호출됩니다.

 반대로 외부로부터 받는 함수, receive() 또는 try_receive() 에 의해서 message 를 전달해야 하는 경우, source block 의 역할을 하게 되고, 나머지 함수들이 호출되게 됩니다.

 이 예제로 완벽하게는 아니더라도 어떻게 사용자 정의 message block 을 구현해야 하는지 아셨을 것이라 생각됩니다.

 

사용 코드

#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int main()
{
	priority_buffer< int > buffer;

	parallel_invoke(
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 1, 12 ) ); },
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 3, 36 ) ); },
		[ &buffer ] { for( unsigned int i = 0; i < 25; ++i ) asend( buffer, make_tuple( 2, 24 ) ); } );

	for( unsigned int i = 0; i < 75; ++i )
	{
		wcout << receive( buffer ) << L' ';
		if( ( i + 1 ) % 25 == 0 )
			wcout << endl;
	}
}

[ 코드3. priority_buffer 를 사용하는 예제 ]

 parallel_invoke() 를 사용하기 때문에 순서를 입력되는 순서를 보장할 수 없습니다. 하지만 receive() 를 사용하여 message 를 출력해 보면 우선 순위가 큰 순서대로 출력되는 것을 알 수 있습니다.


[ 그림1. priority_buffer 를 사용하는 예제 결과 ]

[ 그림1. priority_buffer 를 사용하는 예제 결과 ]


 

마치는 글

 사용자 정의 message block 을 구현하는 것을 마지막으로 AAL 에 관련된 글을 마무리합니다. AAL 에 관련해 새로운 소식이 있으면 그 때 다시 AAL 에 관련된 글을 작성하도록 하겠습니다.

 다음 글은 AAL 을 포함하는 Concurrency Runtime 에서 제공하는 작업 스케줄러에 대해서 작성해 볼 예정입니다. 작업 스케줄러는 AAL 의 message block 과 함께 유용하게 사용할 수 있으므로 AAL 에 관심이 많으신 분들도 기대하셔도 좋을 것 같습니다.

Asynchronous Agents Library – message block 8. ( timer )

VC++ 10 Concurrency Runtime 2010. 8. 12. 08:30 Posted by 알 수 없는 사용자

Asynchronous Agents Library
– message block 8. ( timer )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이번 글에서 설명하는 timer 를 마지막으로 Asynchronous Agents Library( 이하 AAL ) 에서 제공하는 모든 message block 들에 대한 소개를 마칩니다.

  • unbouned_buffer
  • overwrite_buffer
  • single_assignment
  • call
  • transformer
  • choice
  • join
  • multitype_join
  • timer

   각각의 message block 들의 특징을 파악하고 필요한 때에 적합한 것들을 사용하시기 바랍니다.

 

timer< _Type >

 timer 는 message 의 전송을 연기하거나, 주기적으로 같은 message 를 전송하는 역할을 합니다.

 안타깝게도 하나의 timer 는 같은 message 밖에 전송할 수 없게 구현되어 있습니다. 또한 하나의 message block 에만 연결하여 전송할 수 있습니다.

 timercall 또는 transformer 를 연결한다면 좀 더 유연하게 확장할 수 있을 것입니다.

 

상태

 timer 는 총 4가지의 상태를 갖습니다.

  • Initialized – 생성된 후, 아직 시작하지 않은 상태.
  • Started – 시작된 상태.
  • Paused – 일시 정지된 상태.
  • Stopped – 정지한 상태.

 

멤버 함수

 생성자 및 public 인 멤버 함수들 중 인터페이스를 재정의한 함수들을 제외하고 알아보도록 하겠습니다.

 

생성자

timer(unsigned int _Ms, _Type const& _Value, ITarget<_Type> *_PTarget = NULL, bool _Repeating = false)

[ 코드1. timer 의 생성자 ]

 첫 번째 매개변수인 _Ms 는 연기되는 시간 또는 주기적인 시간으로 밀리 초를 받습니다.

 두 번째 매개변수인 _Value 는 전송할 message 입니다.

 세 번째 매개변수인 _PTarget 은 전송한 message 를 받을 대상 message block 입니다. 기본 값이 NULL 인데 NULL 인 경우에는 생성된 이후에 link_target() 을 이용하여 연결할 수 있습니다.

 여기서 주의해야 할 점은 timer 는 단 하나의 message block 만을 연결할 수 있다는 것입니다. link_target() 으로 여러 message block 들을 연결할 경우, 런타임 에러를 보게 될 것입니다.

 네 번째 매개변수인 message 의 전송을 연기 또는 주기적으로 전송하는 것을 선택합니다. true 일 경우 주기적으로 전송하며, false 는 연기 후, 단 한번만 전송됩니다.

 

void start()

 start() 는 timer 를 시작하는 함수로 생성된 후인 Initialized 또는 일시 정지된 Paused 상태에서만 동작합니다. 그리고 Started 상태가 됩니다.

 

void stop()

 stop() 은 timer 를 정지하는 함수입니다. 상태가 Stopped 가 되고, 해당 timer 를 다시 사용할 수 없습니다.

 

void pause()

 pause() 는 timer 를 일시 정지하는 함수입니다. 만약 주기적인 message 전송이 아니라 message 의 전송을 연기하도록 생성했다면, stop() 을 호출합니다. 그렇지 않을 경우, 내부 타이머를 멈추고, 상태를 Paused 로 변경합니다.

 

예제

 timer 의 특징을 알아보기 쉽도록 예제를 구현해보겠습니다.

 

시나리오

 채팅 중에 관리자에 의해서 사용자에게 전송되는 공지 사항을 시뮬레이션 해보도록 하겠습니다.

 

코드

#include <iostream>
#include <string>
#include <array>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 채팅 메시지 구조체
struct Message
{
	wstring		id;
	wstring		message;
};

// 채팅 메시지를 입력받는 agent.
class Typer
	: public agent
{
public:
	Typer( ITarget< Message >& buffer, const wstring id )
		: buffer( buffer )
		, isAvailable( true )
		, id( id ) { }

protected:
	void run()
	{
		while( this->isAvailable )
		{
			Message sendingMessage = { this->id };

			getline( wcin, sendingMessage.message );			

			send( this->buffer, sendingMessage );
		}		

		this->done();
	}

private:
	ITarget< Message >&		buffer;
	bool					isAvailable;
	wstring					id;
};

// 전달받은 메시지를 화면에 출력하는 agent.
class MessageDisplayer
	: public agent
{
public:
	MessageDisplayer( ISource< Message >& source )
		: source( source ) { }

protected:
	void run()
	{
		while( true )
		{
			Message receivedMessage = receive( this->source );

			wcout << L"[" << receivedMessage.id << L"]: " << receivedMessage.message << endl;
		}

		this->done();
	}

private:
	ISource< Message >&		source;
};

int main()
{
	// 시스템 로케일 정보로 설정.( 한글 깨짐 방지 )
	setlocale(LC_ALL, "");

	// 전달된 메시지를 보관하는 버퍼.
	unbounded_buffer< Message > messageBuffer;
	
	// 공지로 전달할 메시지.
	Message noticeMessage = { L"공지", L"2010년 8월 28일에 Visual Studio Camp 가 열립니다." };

	// 5초에 한 번씩 주기적으로 메시지를 전달할 timer
	timer< Message > notifier( 5000, noticeMessage, &messageBuffer, true );

	// timer 시작.
	notifier.start();

	// agent 들. 
	Typer typer( messageBuffer, L"MuMbi" );
	MessageDisplayer displayer( messageBuffer );

	array< agent*, 2 > agents = { &typer, &displayer };

	// agent 들 작업 시작.
	for_each( agents.begin(), agents.end(), []( agent* pAgent )
	{
		pAgent->start();
	} ) ;

	// agent 들 작업 종료까지 대기.
	agent::wait_for_all( agents.size(), &*agents.begin() );
}

[ 코드2. timer 를 이용한 주기적인 message 전송 예제 ]

 Typer 클래스는 사용자의 입력을 메시지 버퍼로 전송하는 역할을 하고, MessageDisplayer 클래스는 전송된 메시지들을 화면에 출력해주는 역할을 합니다.

 timer 는 5초마다 공지 사항을 메시지 버퍼로 전달합니다.

 결국 MessageDisplayer 클래스는 사용자의 메시지와 주기적으로 전달되는 메시지를 모두 화면에 출력하게 됩니다.

 

[ 그림1. timer 를 이용한 주기적인 message 전송 예제 결과 ]

[ 그림1. timer 를 이용한 주기적인 message 전송 예제 결과 ]

 

마치는 글

 이번 글을 마지막으로 AAL 에서 제공하는 모든 message block 들에 대해서 알아보았습니다.

 제공되는 message block 들은 각각 모두 사용할 상황이 다른 특징을 가지고 있고, 굉장히 유용합니다. 하지만 약간의 부족한 점들을 느끼고, 아쉬운 점이 보입니다.

 그런 점들을 보완하여 우리가 원하는 message block 을 만들 수 있습니다. 그렇게 만들어진 message block 은 스레드에 안전함을 보장해주어야 합니다. 

 다음글에서는 스레드 안전 보장을 위한 동기화 데이터 구조에 대해서 알아보도록 하겠습니다.

Asynchronous Agents Library
– message block 7. ( join & multitype_join )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이 글에서는 동기화 메커니즘으로 사용할 수 있는 joinmultitype_join 에 대해서 알아보겠습니다. join 은 이전 글에서 소개한 choice 와 함께 동기화 메커니즘으로 사용할 수 있습니다.

 

join< _Type, _JType >

 join 은 연결된 모든 message block 에서 message 를 받아올 때까지 기다리는 역할을 합니다. join 의 이 역할을 수행하기 위해서는 전달 함수인 receive() 와 함께 사용해야 합니다.

 템플릿 매개변수인 _Type 은 message 의 타입입니다. 연결된 message block 들이 처리하는 message 타입으로 같은 타입이어야 합니다.

 템플릿 매개변수인 _JType 은 join 의 타입입니다. join 은 2가지 종류가 있습니다. 하나는 greedy 이고, 다른 하나는 non_greedy 입니다.

 _JType 에는 greedy 또는 non_greedy 를 사용할 수 있으며, 기본 템플릿 인자로 non_greedy 가 지정되어 있습니다.

 join 타입은 다음과 같은 enum 형으로 정의되어 있습니다.

enum join_type { 
    greedy = 0,
    non_greedy = 1 
};

[ 코드1. join 타입 ]

 두 타입의 join 모두 연결된 message block 들에서 message 를 받아올 때까지 기다리는 기능은 같습니다. 그렇다면 다른 점에 대해서 알아보겠습니다.

 기본 인자로 지정된 non_greedy 타입은 연결된 message block 들 중 어떤 message block 이 먼저 message 를 받아올 수 있는 상태가 되더라도 연결된 모든 message block 들이 모두 message 를 받아올 수 있는 상태가 될 때까지 message block 에 받아올 수 있는 message 들을 유지합니다. 연결된 모든 message block 들에서 message 를 받아올 수 있을 때가 되면 한꺼번에 message 들을 받아옵니다.

 반면 greedy 타입은 연결된 message block 들 중 어떤 message block 이 먼저 message 를 받아올 수 있는 상태가 되면 그 message 를 미리 받아옵니다. 이 때 이 message block 이 unbounded_buffer 라면 이로 인해 join 이 받아간 message 가 제거됩니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 멤버 함수가 없습니다. 하지만 ISource 인터페이스를 재정의한 link_target() 을 사용할 수 있으며, 이 함수는 join 과 다른 message block 을 연결하는데 사용하게 됩니다.

 가장 기본적인 생성자의 매개변수는 연결할 message block 의 개수입니다. 이 개수보다 실제로 연결된 message block 의 개수가 많으면 런타임에 에러를 발생하고, 적으면 교착상태에 빠지게 됩니다. 생성자의 매개변수로 지정된 message block 의 개수가 연결되어 message 를 받을 때까지 대기하기 때문입니다.

 

특징

 message block 이 join 을 연결할 경우 몇 가지의 특징들이 있습니다.

  • 하나의 message block 이 여러 개의 join 에 연결될 경우, 먼저 연결된 join 에 message 를 보냅니다.
  • 연결된 join 들 중 greedy join 이 있을 경우 greedy join 에게 먼저 message 를 보냅니다.
  • 연결된 join 들 중 greedy join 이 여러 개일 경우, 먼저 연결된 greedy join 에 message 를 보냅니다.

 

 또한 어떤 mesage block 들은 join 과 함께 사용할 수 없습니다.

  • single_assignment 는 단 한번만 send() 가 적용되고 이 후의 send() 는 무시되기 때문에 joinsingle_assignment 으로부터 message 를 한번만 받을 수 있다. 두 번 이상은 받을 수 없기 때문에 무한대기하게 되어 교착상태에 빠진다.
  • transformerjoin 에 연결한 후, 연결한 transformer 로 부터 message 를 받아가면 에러를 발생한다.

 

multitype_join< _TupleType, _JType >

 multitype_joinjoin 과 같은 기능을 하지만 다른 종류의 message 들을 받을 수 있습니다. 그래서 템플릿 매개변수가 message 타입이 아닌 tuple 타입을 받습니다. tuple 에 사용할 수 있는 타입은 choice 에서 사용하는 tuple 의 특징과 같습니다.

 multitype_join 의 타입은 join 과 마찬가지로 greedynon_greedy 를 사용할 수 있습니다.

 

멤버 함수

생성자와 소멸자를 제외한 public 인 멤버 함수는 없고, 인터페이스를 재정의한 멤버 함수들이 있습니다.

 

헬퍼 함수

 tuple 을 사용하는 choice 처럼 multitype_join 도 tuple 을 사용하므로 헬퍼 함수 없이는 굉장히 길고 복잡한 선언이 필요합니다.

그래서 다음과 같은 헬퍼 함수를 제공합니다.

unbounded_buffer< int > i;
unbounded_buffer< float > f;
unbounded_buffer< bool > b;

auto j = make_join( &i, &f, &b );

[ 코드2. 헬퍼 함수 make_join ]

 

예제

 join 의 특징을 살펴볼 수 있는 간략한 예제를 보겠습니다.

 

시나리오

 보통 온라인 게임에서 게임을 시작하게 되면 연결된 모든 클라이언트가 모두 로딩이 완료된 후 동시에 게임을 시작하게 됩니다. 그것을 join 을 이용해서 시뮬레이션 해보겠습니다.

 

코드

#include <iostream>
#include <array>
#include <random>
#include <agents.h>

using namespace std;
using namespace Concurrency;

const unsigned int CLIENT_COUNT = 10;

class Loader
	: public agent
{
public:
	~Loader()
	{
		this->isComplete.unlink_targets();
	}

	Loader( unsigned long id, join< bool >& synchronizer )
		: random_generator( id )
	{
		this->isComplete.link_target( &synchronizer );
	}

protected:
	void run()
	{
		this->Loading();

		send( this->isComplete, true );
		wcout << L"Load is complete." << endl;
		
		this->done();
	}

private:
	void Loading()
	{
		unsigned int loadingTime = random_generator() % 3000 + 1000;
		Concurrency::wait( loadingTime );
		wcout << L"Loading time: " << loadingTime / 1000.0f << L" sec..";
	}

private:	
	single_assignment< bool >	isComplete;
	mt19937						random_generator;
};

int main()
{	
	join< bool > synchronizer( CLIENT_COUNT );
	
	array< Loader, CLIENT_COUNT > loaderArray = {
		Loader( 1, synchronizer ),
		Loader( 2, synchronizer ),
		Loader( 3, synchronizer ),
		Loader( 4, synchronizer ),
		Loader( 5, synchronizer ),
		Loader( 6, synchronizer ),
		Loader( 7, synchronizer ),
		Loader( 8, synchronizer ),
		Loader( 9, synchronizer ),
		Loader( 10, synchronizer ),
	};

	for_each( loaderArray.begin(), loaderArray.end(), []( Loader& loader ) { loader.start(); } );

	receive( synchronizer );

	wcout << L"Game Start!" << endl;

	for_each( loaderArray.begin(), loaderArray.end(), []( Loader& loader ) { agent::wait( &loader ); } );
}

[ 코드3. join 을 이용한 동기화 예제 ]

 10개의 클라이언트 역할을 하는 agent가 있고, 각 클라이언트마다 로딩 속도가 다른 것을 시뮬레이션 하기 위해 난수를 사용하여 로딩 시간을 결정하였습니다.

 각 agent 는 완료를 나타내는 single_assignment 를 하나의 join 에 연결하여 로딩이 모두 완료될 때까지 기다립니다.

  join 에서 message 를 받는다는 것이 동기화가 완료된 것이므로 바로 게임을 시작합니다.

 

[ 그림1. join 을 이용한 동기화 예제 ]

[ 그림1. join 을 이용한 동기화 예제 ]

 

마치는 글

 이번 글에서 joinmultitype_join 에 대해서 알아보았습니다.

 choice 와 비교해서 choice 는 연결된 message block 들 중 먼저 들어온 message 가 있으면 바로 message 를 받을 수 있던 반면에 join 은 연결된 message block 들로부터 message 를 모두 받을 때까지 대기합니다.

 이 message block 들, choicejoin, multitype_join 은 Win32 API 의 WaitForMultipleObjects() 와 비슷한 기능을 한다는 것을 알 수 있습니다.

 이 message block 들의 특징은 외부 이벤트 객체가 아닌 데이터 자체가 이벤트로 바인딩되어 처리된다는 점입니다.

결론적으로 이 message block 들을 이용해 동기화를 구현할 수 있음을 알 수 있습니다.

Asynchronous Agents Library
– message block 6. ( choice )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 unbounded_buffer, overwrite_buffer 그리고  single_assignment 와 같이 message 를 보관하는 버퍼 기능의 message block 들과, call, transformer 와 같이 지정된 함수형 객체를 수행하는 기능의 message block 들에 대해서 알아보았습니다.

 이번 글에서는 message block 들의 상태를 기반으로 동작하는 message block 들 중 choice 에 대해서 알아보도록 하겠습니다.

 

choice< _TupleType >

 choice 의 특징은 지정된 message block 들 중 가장 먼저 messge 를 가져올 수 있는 message block 을 가리키는 기능입니다. 다시 설명하면 어떤 message 를 받을 준비가 된 message block 들을 choice 에 지정한 후, 지정된 message block 들 중 가장 먼저 message 를 받은 message block 을 choice 가 가리키게 됩니다.

 

tuple 의 조건

 choice 은 message 를 받을 준비가 된 message block 들을 지정할 때 tr1 의 tuple 을 사용합니다. 이 글의 주제가 tr1 이 아닌 만큼 tuple 에 대한 자세한 설명은 생략하겠습니다. 간략히 설명하면 std::pair 의 일반화된 객체라고 생각하시면 됩니다. std::pair 가 2개의 타입을 저장할 수 있다면 tuple 은 10개의 여러 가지 타입을 저장할 수 있습니다.

 그래서 다른 message block 과 달리 템플릿 매개변수가 message 타입이 아닌 여러 message block 들의 타입을 저장할 수 있는 tuple 의 타입입니다.

 하지만 아무 tuple 이나 사용할 수 있는 것이 아닙니다. 약간의 조건을 충족시키는 tuple 이어야만 합니다.

  • tuple 의 구성 요소로 source_type 이라는 typedef 를 가지고 있는 message block 들이어야 한다. ( message block 중 call 은 source_type 이 정의되어 있지 않기 때문에 사용할 수 없습니다. )
  • message block 들은 복사 생성자와 배정 연산자( = ) 의 사용을 금지( private )하고 있으므로 tuple 의 구성 요소로 포인터 타입을 사용해야 한다.

 

특징

 choice 는 여러 message block 들 중 가장 먼저 message 를 받은 message block 을 가리킵니다. 그 말은 choice 는 tuple 로 지정된 message block 들의 index 를 가지고 있다는 말입니다.

 그런데 그 index 를 저장하기 위한 변수로 내부적으로 single_assignment 를 사용합니다. 그 뜻은 single_assignment 는 단 한 번만 값을 저장할 수 있기 때문에 choice 는 가리키게 된 message block 의 index 를 변경할 수 없습니다. 즉, 일회성이라는 말이 됩니다.

 receive() 로 choice 의 message 를 받아오면 지정된 message block 들 중 하나라도 message 를 받을 때까지 기다리다가 message 를 받은 message block 이 생기면 그 message block 의 index 를 반환합니다.

 위의 특징을 응용하면 여러 message block 들 중 하나라도 message 를 받을 때까지 대기하는 기능으로 사용할 수 있습니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 멤버 함수 중 인터페이스로 재정의된 함수들을 제외한 함수들입니다.

인터페이스를 재정의한 멤버 함수들은 이전 글인 2010/07/10 - [Language Development/VC++ 10 Concurrency Runtime] - Asynchronous Agents Library – message block 1. ( 인터페이스 ) 를 참고하시기 바랍니다. 

 

bool has_value() const

 현재 message block 의 index 를 가지고 있는지 반환합니다. 내부적으로 single_assignmenthas_value() 를 호출합니다.

 

size_t index()

 가지고 있는 message block 의 index 를 반환합니다. 내부적으로 single_assignmentvalue() 를 호출하고 하고, 그 value() 는 receive() 를 사용하기 때문에 아직 index 를 가지고 있지 않다면 가질 때까지 대기하게 됩니다.

 

template <typename _Payload_type>
_Payload_type const & value()

 choice 가 가리키고 있는 index 의 message block 의 message 의 값을 반환합니다. message 의 타입을 명시적으로 호출해야 하기 때문에 사용하기 불편합니다.

 

헬퍼 함수

 choice 는 tuple 을 사용하기 때문에 선언이 복잡할 수 밖에 없습니다. 그래서 조금 더 쉽게 선언을 할 수 있도록 헬퍼 함수를 제공합니다.

unbounded_buffer< int > i;
unbounded_buffer< float > f;
unbounded_buffer< bool > b;

tuple< unbounded_buffer< int >*, unbounded_buffer< float >*, unbounded_buffer< bool >* > tp( &i, &f, &b );
choice< tuple< unbounded_buffer< int >*, unbounded_buffer< float >*, unbounded_buffer< bool >* > > ch( tp );

[ 코드1. 헬퍼 함수를 사용하지 않은 choice 선언 ]

 헬퍼 함수를 사용하지 않으면 템플릿 매개변수인 타입들을 모두 적어야 하기 때문에 굉장히 길어집니다. 특히나 message block 들의 타입이 굉장히 길기 때문에 더욱 불편합니다. typedef 를 사용하더라도 길기는 마찬가지 입니다.

 

unbounded_buffer< int > i;
unbounded_buffer< float > f;
unbounded_buffer< bool > b;

auto ch = make_choice( &i, &f, &b );

[ 코드2. 헬퍼 함수를 사용한 choice 선언 ]

 헬퍼 함수와 Visual studio 2010 에서 지원하는 C++0x 문법인 auto 를 사용하여 더욱 더 짧게 선언할 수 있습니다.

 

예제

 choice 의 특징을 쉽게 알아볼 수 있는 예제를 구현해보겠습니다.

 

시나리오

 두 agent 를 사용해서 어떤 agent 가 더 빨리 수행되는가를 육상에 비유하여 표현해보았습니다. choice 객체는 두 agent 중 빨리 수행한 쪽을 판별하는 역할을 합니다.

 

코드

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

class Runner
	: public agent
{
public:
	Runner( ITarget< bool >& target, const wstring& name )
		: target( target )
		, name( name ) { }

	const wstring& GetName() const
	{
		return this->name;
	}

protected:
	void run()
	{
		for( unsigned int i = 0; i < 1000; ++i )
		{
			if( 0 == i % 100 )
				wcout << this->name << L" - " << i << L"m." << endl;
		}

		wcout << this->name << L" - " << L"Finished!" << endl;

		asend( target, true );

		this->done();
	}

private:
	ITarget< bool >&	target;
	wstring				name;
};

int main()
{
	single_assignment< bool > isFinished[2];

	auto winnerRecoder = make_choice( &isFinished[0], &isFinished[1] );

	Runner runner[2] = { Runner( isFinished[0], L"Carl Lewis" ), Runner( isFinished[1], L"Usain Bolt" ) };
	runner[0].start();
	runner[1].start();

	agent* agents[2] = { &runner[0], &runner[1] };
	agent::wait_for_all( 2, agents );

	wcout << runner[ receive( winnerRecoder ) ].GetName() << L" is winner!" << endl;
}

[ 코드3. choice 를 사용한 먼저 수행된 agent 판별 예제 ]

 두 agent 에 육상 선수인 칼루이스와 우사인 볼트의 이름을 붙여보았습니다. 각 agent 는 수행이 완료되면 완료 message 를 single_assignment 에 보내고, choice 객체는 완료 message 가 먼저 도착한 message block 의 index 를 갖게 됩니다.

 그 index 를 이용해 해당 agent 의 육상 선수 이름을 출력합니다.

 이 예제의 결과는 스케쥴링에 따라 다르기 때문에 실행할 때마다 결과가 다를 수 있습니다.

 

[ 그림1. choice 를 사용한 먼저 수행된 agent 판별 예제 ]

[ 그림1. choice 를 사용한 먼저 수행된 agent 판별 예제 ]

 

마치는 글

 이번에는 tuple 을 이용하는 choice 에 대해서 알아보았습니다. Visual studio 2010 에서 지원하는 C++0x 문법을 사용하면 좀 더 쉽게 사용할 수 있습니다.

다음 글 또한 새로운 message block 에 대해서 알아보도록 하겠습니다.

Asynchronous Agents Library
– message block 5. ( transformer )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이번 글에서는 transformer 라는 message block 에 대해서 알아봅니다.

 transformer 는 지난 글에서 알아본 call 과 유사하나 좀 더 유연합니다. call 과 비교해서 보시고, calltransformer 를 언제 사용해야 할지 구분해 보시기 바랍니다.

 

transformer< _Input, _Output >

 transformercall 과 마찬가지로 지정된 함수, 함수 객체를 수행하는 역할을 합니다. 하지만 call 과는 확실히 다릅니다. 이름에서도 알 수 있듯이 message 를 변환하는 기능을 가지고 있습니다. 이 기능이 call 과 어떻게 다른지 알아보겠습니다.

 

특징

 transformer 의 특징 중 첫째는 결과를 반환한다는 것입니다. 변환하는 기능을 가지고 있기 때문에 변환된 값을 내보내 주어야 하기 때문입니다. 템플릿 매개변수를 보고 이미 눈치채신 분도 있겠지만, transformer 를 선언할 때 입력되는 매개변수의 타입과 반환 타입을 명시해주어야 합니다.

 이로 인해 내부적으로 call 과 다른 점은 반환 값을 보관하는 큐( queue ) 를 포함하고 있습니다. call 은 작업들을 순차적으로 처리하기 위한 작업 큐를 가지고 있고, transformer 는 작업 큐 뿐만 아니라 반환 값을 보관하는 큐도 가지고 있습니다.

 반환 값을 보관하는 큐의 값을 꺼내오기 위해서 receive() 나 try_receive() 를 사용할 수 있습니다. 이 특징은 message 들을 전달, 보관하는 unbounded_buffer 와 비슷합니다. 하지만 unbounded_buffer 처럼 사용하시면 안됩니다.

 그 이유는 transformer 는 두 가지 기능, 변환 함수를 수행하는 기능과 변환된 값을 내보내는 기능을 가지고 있는데 이 둘을 동시에 처리할 수 없기 때문입니다. 변환 함수를 수행하는 중인 transformer 에서 변환된 값( 반환 값 )을 얻어내려 시도해도 변환 함수를 수행 중일 때에는 변환된 값을 내어주지 않아 원하는 시기에 값을 얻어낼 수 없습니다.

 원하는 시기에 값을 제대로 얻기 위해서는 변환된 값을 unbounded_bufferoverwrite_buffer 등과 같이 message 를 보관할 수 있는 message block 에 전달한 후, 그 message block 에서 값을 얻어와야 합니다.

 두 번째 특징은 다른 message block 들과 연결하여 변환된 반환 값을 스스로 전달할 수 있다는 것입니다. ISource 인터페이스의 link_target() 을 사용하는데, 이 기능을 사용하여 절차적인 작업들을 잘게 나누어 순차적으로 처리할 수 있습니다. 이러한 메커니즘을 파이프라인( pipeline ) 이라고 합니다.

 일반적인 C++ 프로그램에서도 파이프라인 개념은 이미 많이 사용되고 있습니다.

 FunctionA( FunctionB( FunctionC() ) ) 와 같이 진행되는 처리도 파이프라인으로 볼 수 있습니다. 하지만 transformer 를 이용한 파이프라인은 이와 차이점이 있습니다. 바로 비 동기 처리가 가능하다는 것입니다.

 하나의 파이프라인의 작업이 끝날 때까지 대기하지 않기 때문에, 여러 파이프라인의 작업이 동시에 수행할 될 수 있습니다.

 잠시 후, 이런 파이프라인을 이용하는 예제를 알아보도록 하겠습니다.

 

선언

template < class _Output, class _Input >
class transformer

[ 코드1. transformer 의 선언 ]

 transformercall 처럼 함수 타입을 지정할 수 없습니다.

 transformer 는 std::tr1::function<_Output(_Input const&)> 로 함수 타입이 고정되어 있습니다.

 

예제

 transformer 를 이용하여 파이프라인을 구성하여 처리하는 예제를 구현해보도록 하겠습니다.

 

시나리오

 문자열을 꾸미는 작업을 각 단계별로 나누어 파이프라인을 구성하고, 이것을 비 동기로 처리하는 시나리오입니다.

 

코드

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 문자열을 꾸며주는 helper class.
class DecorationHelper
{
public:
	static wstring Boiler( wstring food )
	{		
		wcout << L"boiling.." << endl;
		Concurrency::wait( 1000 );		
		return L"boiled " + food;
	}

	static wstring AddSugar( wstring food )
	{		
		wcout << L"adding sugar.." << endl;
		Concurrency::wait( 1000 );		
		return L"sweet " + food;
	}

	static wstring PutOnPlate( wstring food )
	{
		wcout << L"putting on a plate.." << endl;
		Concurrency::wait( 1000 );		
		return food + L" on a plate";
	}

	static wstring WrapUp( wstring food )
	{
		wcout << L"Wrapping the food up.." << endl;
		Concurrency::wait( 1000 );		
		return food + L".";
	}
};

int main()
{
	// 문자열 꾸밈 함수들을 수행하는 transformer message block 들.
	transformer< wstring, wstring > boiler( &DecorationHelper::Boiler );
	transformer< wstring, wstring > addSugar( &DecorationHelper::AddSugar );
	transformer< wstring, wstring > putOnPlate( &DecorationHelper::PutOnPlate );
	transformer< wstring, wstring > wrapUp( &DecorationHelper::WrapUp );

	// 최종 변환된 message 를 저장하는 message block.
	unbounded_buffer< wstring > result;

	// 꾸며진 문자열들을 다른 꾸밈 함수에 연결하여 파이프라인을 구성.
	boiler.link_target( &addSugar );
	addSugar.link_target( &putOnPlate );
	putOnPlate.link_target( &wrapUp );
	wrapUp.link_target( &result );

	// 구성된 pipeline 에 message 전달.
	asend( boiler, wstring( L"soup" ) );
	asend( boiler, wstring( L"noodle" ) );
	asend( boiler, wstring( L"water" ) );	

	// 최종 변환된 message 를 받아서 출력.
	while( true )
	{
		wcout << L"completed food: " << receive( result ) << endl;		
	}
}

[ 코드2. transformer 를 이용하여 파이프라인을 구성하고 비 동기로 처리하는 예제 ]

 문자열을 꾸미는 함수들을 각각 transformer 에 지정한 후, ISource 인터페이스인 link_target() 을 이용하여 파이프라인을 구성합니다.

 구성된 파이프라인의 첫 message block 인 boiler 객체에 인자로 사용할 문자열을 message 로 보내면 파이프라인이 비 동기로 처리됩니다.

 파이프라인의 마지막은 unbounded_buffer 를 사용하여 변환된 최종 값을 저장하고, 저장된 값들을 꺼내서 화면에 출력합니다.

 

[ 그림1. transformer 를 이용하여 파이프라인을 구성하고 비 동기로 처리하는 예제 ]

[ 그림1. transformer 를 이용하여 파이프라인을 구성하고 비 동기로 처리하는 예제 ]


 

마치는 글

  transformer 를 이용해 파이프라인을 구성하고 비 동기로 처리하는 방법을 알아보았습니다.

 지난 글에서 소개해드린 call 과는 확실히 다르다는 것을 알게 되셨을 것입니다.

 이 파이프라인 개념은 많은 곳에 적용되고 있으므로 유용하게 사용하실 수 있을 것입니다.

Asynchronous Agents Library – message block 4. ( call )

VC++ 10 Concurrency Runtime 2010. 7. 31. 17:00 Posted by 알 수 없는 사용자

Asynchronous Agents Library
– message block 4. ( call )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이번 글에서는 call 이라는 message block 에 대해서 알아보겠습니다. 이번에 알아볼 call 또한 이미 알아본 message block 들과는 다른 특징이 있으니 특징을 이해하고, 필요할 때 바로 사용할 수 있어야 합니다.

 

call< _Type >

 call 은 함수 포인터와 같은 역할을 합니다. call 에 message 를 보내면 그 message 는 생성자에서 지정한 함수 포인터의 인자로 사용됩니다. 그래서 call 에 message 를 보낸다는 것은 함수를 호출하는 것과 같은 효과를 얻을 수 있습니다.

 

특징

 call 과 함수 호출의 다른 점이 있습니다. call 에 의해서 수행되는 함수를 동기 또는 비 동기로 수행되도록 선택할 수 있습니다.

 동기 전달 함수인 send() 를 사용하여 message 를 전달하면 지정된 함수의 수행이 종료될 때까지 기다립니다. 그렇기 때문에 지정된 함수의 수행이 종료되어야 해당 context 가 진행됩니다.

 반면에 비 동기 전달 함수인 asend() 를 사용하여 message 를 전달하면 지정된 함수의 수행이 종료될 때까지 기다리지 않고 호출한 context 는 계속 진행됩니다. 즉, 호출한 스레드와 call 의 함수가 동시에 진행됩니다.

 이처럼 동기와 비 동기 처리를 적절히 선택하여 구현할 수 있습니다.

 또 하나 기억해 두어야 할 특징은 하나의 call 객체에 message 를 여러 차례 보내더라도 동시에 수행되지 않습니다. 하나의 call 객체는 내부적으로 작업 큐( queue ) 를 가지고 있어 순차적으로 수행됩니다.

 하지만 하나의 call 객체가 아니라 서로 다른 call 객체는 asend() 를 이용해 동시에 수행될 수 있습니다. 서로 각각의 작업 큐를 가지고 있기 때문입니다.

 call 의 안타까운 점은 함수의 반환을 처리하지 못한다는 점입니다. 만약 지정된 함수의 결과를 얻어내야 한다면 해당 함수 내부에서 다른 message block 으로 결과를 message 로 보내야 합니다.

 call 은 생성자 이외에 public 인 멤버 함수가 없습니다.

 

선언

 사실 call 이라는 message block 의 타입은

template < class _Type, class _FunctorType = std::tr1::function< void( _Type const & ) > >
class call

[ 코드1. call 의 선언 ]

입니다.

 tr1 의 function 은 일반 함수 포인터와 멤버 함수 포인터, 함수 객체를 모두 가질 수 있으므로 기본 템플릿 매개변수인 _FunctorType 을 지정하지 않고 사용하는 것이 좋습니다. 

 위의 선언이 보여주듯이 _Type 은 수행될 함수의 매개변수의 타입입니다. bind 와 같은 어댑터 함수들과 함께 사용하여 여러 인자들을 바인딩할 수 있습니다.

 

예제

 call 의 특징들을 쉽게 알아볼 수 있는 예제를 구현해보았습니다.

 

시나리오

 Loader 라는 agent 클래스가 작업을 진행하고 call 을 이용해 진행 상황을 화면에 출력해주는 예제입니다.

 

코드

#include <iostream>
#include <agents.h>

using namespace std;
using namespace Concurrency;

class Loader
	: public agent
{
public:
	Loader( call< unsigned int >& displayDelegate )
		: displayDelegate( displayDelegate ) { }

protected:
	void run()
	{
		for( unsigned int percent = 0; percent <= 100; percent += 10 )
		{
			Concurrency::wait( 1000 );
			asend( this->displayDelegate, percent );
		}

		this->done();
	}

private:
	call< unsigned int >&		displayDelegate;
};

int main()
{
	call< unsigned int >	showLoading( []( unsigned int percent )
	{
		switch( percent )
		{
		case 0:		wcout << L"loading..";					break;
		case 100:	wcout << endl << L"complete!" << endl;	break;
		default:	wcout << L"..";							break;
		}		
	} );

	Loader loader( showLoading );

	loader.start();

	agent::wait( &loader );
}

[ 코드2. call 의 비 동기 처리를 이용한 예제 ]

 Loader agent 의 작업 진행 상황을 화면에 출력합니다. 진행률이 0 일 때는 “loading.." 을 출력하고, 이후, 진행이 완료되기 전까지 “..” 을 추가하여 진행 상황을 알리고, 완료되면 “complete!” 를 출력하여, 작업의 완료를 알립니다.

 

[ 그림1. call 의 비 동기 처리를 이용한 예제 ]

[ 그림1. call 의 비 동기 처리를 이용한 예제 ]

 

마치는 글

 이미 알아본 message block 들과는 많이 다른 call 에 대해서 알아보았습니다. call 은 함수의 수행을 동기 및 비 동기 처리로 할 수 있도록 도와주는 반면 결과를 얻기는 쉽지 않습니다.

 이러한 call 의 단점의 보완과 더 많은 특징을 가지고 있는 transformer 라는 message block 을 제공합니다.

 다음 글에서는 transformer 를 이용해 더욱 더 신나는 멀티 코어 프로그래밍을 해보도록 해보겠습니다.

Asynchronous Agents Library
– message block 3. ( overwrite_buffer & single_assignment )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 지난 글 unbounded_buffer 에 이어 또 다른 message block 인 overwrite_buffersingle_assignment 에 대해서 알아보도록 하겠습니다.

 Message block 들은 특징들이 모두 다르기 때문에 지난 unbounded_buffer 을 생각하시면 이해가 어려울 수 있습니다. message block 하나 하나의 쓰임새가 다르므로 새로운 것을 알아본다고 생각하시는 것이 좋을 것 같습니다.

 

overwrite_buffer< _Type >

 지금부터 설명드릴 overwrite_bufferunbounded_buffer 와는 달리 하나의 변수라고 생각하시면 이해하기 쉬울 것입니다.

 Concurrency runtime 을 사용하지 않고, 스레드 간의 상태나 정보를 공유하려면 전역 변수나 힙( heap ) 에 할당된 변수에 락( lock ) 을 걸어 사용해야 합니다.

 overwrite_buffer 는 방금 언급한 번거로운 작업들을 알아서 해줍니다. 내부에서 힙에 메모리를 할당하고, 접근 시 락을 겁니다. 하지만 사용하는 우리는 그런 것들을 신경 쓰지 않고 마치 지역 변수처럼 사용할 수 있습니다.

 unbounded_buffer 는 외부에서 message 를 받아가면 내부에서 해당 message 가 제거되는 반면에, overwrite_buffer 는 제거되지 않습니다. 또한 하나의 변수와도 같기 때문에 외부에서 message 를 보내면 이 전의 message 를 덮어쓰고 새 message 가 저장됩니다.

 결국 overwrite_buffer 는 단 하나의 message 만을 갖게 됩니다.

 그럼 overwrite_buffer 의 멤버 함수에 대해서 알아보도록 하겠습니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 멤버 함수들입니다.

 

bool has_value() const

 현재 message 를 가지고 있는지 반환합니다.

 어떠한 message 도 갖지 않을 경우에 false 를 반환합니다. 만약, 한번이라도 overwrite_buffer 에 message 가 전달된다면 그 후부터는 true 를 반환합니다. overwrite_buffer 는 외부에서 message 를 받아가도 내부의 message 가 제거되지 않기 때문입니다.

 Message 를 갖고 있지 않을 때, 외부에서 동기 함수인 receive() 를 사용해 message 를 얻기를 원한다면 overwrite_buffer 에 message 가 들어올 때까지 기다립니다. message 가 제거되지 않기 때문에 한번이라도 overwrite_buffer 가 message 를 받으면 receive() 가 기다리는 일은 없을 것입니다.

 

_Type value();

 현재 가지고 있는지 message 를 반환합니다.

 내부적으로 동기 전달 함수인 receive() 를 사용하므로 message 를 가지고 있지 않다면 message 를 갖게 될 때까지 기다립니다. 만약 이 때, has_value() 를 호출했다면 false 를 반환할 것입니다.

 Message 가 제거되지 않기 때문에 전달 함수를 이용해 message 를 받아갈 경우, 복사본이 전달됩니다.

 

예제

 overwrite_buffer 의 간단한 예제를 구현해보도록 하겠습니다.

 

시나리오

 네트워크 지연 시간을 갱신하고, 출력하는 프로그램을 작성할 것입니다.

 네트워크 지연 시간을 갱신하는 역할을 하는 agent 와 갱신된 정보를 출력하는 agent 가 하나의 overwrite_buffer 를 공유하여 사용하는 예제입니다.

 

코드

#include <iostream>
#include <array>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 지연 시간을 얻어오는 agent.
class PingUpdater
	: public agent
{
public:
	PingUpdater( const array< unsigned int, 5 >& delayTimeSource, ITarget< unsigned int >& targetBlock )
		: delayTimeSource( delayTimeSource )
		, targetBlock( targetBlock ) { }

protected:
	// 2초마다 지연 시간을 얻어 옴.
	void run()
	{
		while( true )
		{
			asend( this->targetBlock, this->GetDelayTime() );

			Concurrency::wait( 2000 );
		}

		this->done();
	}

	// 지연 시간을 시뮬레이션하는 함수.
	unsigned int GetDelayTime()
	{
		static unsigned int index = 0;

		unsigned int delayTime = this->delayTimeSource[ index ];

		if( index + 1 < this->delayTimeSource.size() )
			++index;		
		else
			index = 0;

		return delayTime;
	}

private:
	const array< unsigned int, 5 >&	delayTimeSource;
	ITarget< unsigned int >&		targetBlock;
};

// 지연 시간을 출력하는 agent.
class PingDisplayer
	: public agent
{
public:
	PingDisplayer( ISource< unsigned int >& sourceBlock )
		: sourceBlock( sourceBlock ) { }
protected:
	// 1초마다 지연 시간을 출력한다.
	void run()
	{
		while( true )
		{
			this->Display( receive( this->sourceBlock ) );

			Concurrency::wait( 1000 );
		}

		this->done();
	}

	// 지연 시간을 출력하는 함수.
	void Display( unsigned int delayTime )
	{
		wcout << L"current delay time: " << delayTime << endl;
	}

private:
	ISource< unsigned int >&	sourceBlock;
};

int main()
{
	// 네트워크 지연 시간의 시뮬레이션 정보.
	array< unsigned int, 5 > delayTimeSource = { 210, 211, 261, 246, 223 };

	// 공유 버퍼
	overwrite_buffer< unsigned int > delayTimeBuffer;

	// 네트워크 지연 시간을 갱신하는 agent 와 출력하는 agent.
	PingUpdater updater( delayTimeSource, delayTimeBuffer );
	PingDisplayer displayer( delayTimeBuffer );

	// agent 시작.
	updater.start();
	displayer.start();

	// agent 의 작업이 모두 끝날 때까지 대기.
	agent* waitingAgents[2] = { &updater, &displayer };
	agent::wait_for_all( 2, waitingAgents );
}

[ 코드1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

 PingUpdater 클래스는 agent 클래스로 네트워크 지연 시간을 갱신하는 역할을 합니다. 2초에 한 번씩 시뮬레이션을 위해 준비된 정보를 순회하며 얻어와서 overwrite_buffer 에 전달합니다.

 PingDisplayer 클래스도 agent 클래스로 갱신된 네트워크 지연 시간을 화면에 출력하는 역할을 합니다. 1초에 한번씩 overwrite_buffer 로부터 갱신된 정보를 가져와서 화면에 출력합니다.

 예제에서 사용된 Concurrency::wait() 는 Win32 API 의 Sleep() 과 같은 역할을 합니다. agent::wait() 과 혼동하지 않길 바랍니다.

 위 코드를 보시면 굉장히 직관적이고, 간단하게 멀티 스레드 프로그래밍을 할 수 있다는 것을 알 수 있을 것입니다.

 

[ 그림1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

[ 그림1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

 

single_assignment< _Type >

 single_assignment 는 위에서 설명한 overwrite_buffer 와 거의 흡사합니다.

 단지 다른 점이 있다면 message 를 한번만 받을 수 있다는 것입니다. 만약 두 번 이상 보낸 다면 두 번째부터는 무시됩니다.

 멤버 함수 또한 거의 같지만, 다른 점을 알아보겠습니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 함수들입니다.

 

bool has_value() const

 위에서 설명한 overwrite_bufferhas_value() 와 같습니다.

 Message 를 단 한번도 받지 않았다면 false 를 반환하고, 받았다면 true 를 반환합니다.

 

_Type const & value()

 overwrite_buffervalue() 와 같은 기능을 합니다.

 하지만 값을 반환하지 않고 const 참조를 반환한다는 것이 다릅니다.

 overwrite_buffervalue() 와 마찬가지로 message 를 갖고 있지 않다면 message 를 갖게 될 때까지 기다립니다.

 

마치는 글

 이번 글에서는 overwrite_buffersingle_assignment 에 대해서 알아보았습니다.

  single_assignment 는 overwrite_buffer 와 거의 흡사하기 때문에 예제는 생략하였습니다.

 지난 글에서 본 unbounded_buffer 와는 분명히 쓰임새가 다르므로 특징을 잘 파악해두시면 좋을 것입니다.

 다음 글에서 또 다른 message block 을 소개해드릴 것입니다. 그 message block 또한 쓰임새가 분명히 다르므로 Asynchronous Agents Library 의 활용도가 굉장히 넓다는 것을 아시게 될 것입니다.

Asynchronous Agents Library
– message block 2. ( unbounded_buffer )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이전 글에서 message block 의 인터페이스인 ISourceITarget 인터페이스에 대해서 알아보았습니다. 이번 글부터 그 인터페이스들을 상속받아 구현한 message block 에 대해 알아보겠습니다.

 Message block 은 버퍼( buffer ) 를 가질 수도 있고, 상태만 가질 수도 있고, 기능만 가질 수도 있습니다. 그러므로 각 message block 들의 특징을 잘 파악하고, 언제 필요한지 알아야 합니다.

 이번 글에서는 가장 범용적인 유용한 unbounded_buffer 에 대해서 알아보도록 하겠습니다.

 

unbounded_buffer< _Type >

 unbounded_buffer 는 message block 중 가장 많이 사용될 것입니다. unbounded_buffer 는 내부적으로 큐( queue )를 구현하고 있어 message 저장소 역할을 합니다. 누군가 unbounded_buffer 에 message 를 보내면 unbounded_buffer 에 순서대로 차곡차곡 쌓이고, 쌓인 순서대로 꺼내서 쓸 수 있습니다. 꺼낸 message 는 unbounded_buffer 에서 제거됩니다. 그렇기 때문에 여러 곳에서 같은 message 를 꺼내 받을 수 없습니다.

 이런 작업들은 비 동기 agent 들과 사용할 때, 빛을 발합니다.

 unbounded_buffer 는 스레드에 안전하므로 직접 lock 을 하지 않아도 됩니다.

 

생성자

 

unbounded_buffer() – 기본 생성자

 빈 unbounded_buffer 를 생성합니다.

 

unbounded_buffer( filter_method const& _Filter )

 빈 unbounded_buffer 를 생성합니다. 하지만 필터 함수를 지정하여 받을 수 있는 메시지를 거를 수 있습니다.

 이 필터 함수는 bool (_Type const &) 형의 시그니처( signature )를 갖습니다.

 

멤버 함수

 

bool enqueue( _Type const& _Item )

 하나의 message 를 unbounded_buffer 에 보냅니다.

 message 전송이 성공이면 true, 아니면 false 를 반환합니다.

 내부적으로 이 함수는 message 전달 함수인 send() 를 사용합니다. 그리고 send() 의 결과를 반환합니다.

 

_Type dequeue()

 unbounded_buffer 에서 하나의 message 를 꺼냅니다. 꺼낸 message 는 큐에서 제거됩니다.

 꺼내진 message 를 반환합니다. 그리고 꺼내진 message 는 unbounded_buffer 내부에서 제거됩니다.

 enqueue() 와 마찬가지로 내부적으로 message 전달 함수인 receive() 를 사용합니다. receive() 가 반환한 값을 반환합니다.

 

예제

 unbounded_buffer 를 사용하여 작은 시나리오를 구현해보도록 하겠습니다.

시나리오

 윈도우즈 OS 는 사용자의 이벤트들을 메시지 큐에 담고, 큐에 들어온 메시지들을 순차적으로 꺼내서 처리하는 메커니즘을 사용합니다. 이 시나리오를 agent 와 unbounded_buffer 를 이용하여 간단하게 구현해보겠습니다.

코드

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 메시지 객체
class Message
{
	wstring		message;

public:
	Message( const wstring& message )
		: message( message ) { }

	const wstring& GetMessage() const
	{
		return this->message;
	}
};

// 메시지를 발생하는 사용자 agent
class User
	: public agent
{
	ITarget< Message >&	messageQueue;

public:
	User( ITarget< Message >& target )
		: messageQueue( target ) { }

	void ClickMouseLButton()
	{
		send( this->messageQueue, Message( L"WM_LBUTTONDOWN" ) );
		send( this->messageQueue, Message( L"WM_LBUTTONUP" ) );		
	}

	void DragMouseLButton()
	{
		send( this->messageQueue, Message( L"WM_LBUTTONDOWN" ) );
		send( this->messageQueue, Message( L"WM_MOUSEMOVE" ) );
		send( this->messageQueue, Message( L"WM_LBUTTONUP" ) );
	}

	virtual void run()
	{
		this->ClickMouseLButton();
		
		Concurrency::wait( 1000 );

		this->DragMouseLButton();

		this->done();
	}
};

// 발생한 메시지들을 처리하는 메시지 펌프 agent
class MessagePump
	: public agent
{
	ISource< Message >&	messageQueue;

public:
	MessagePump( ISource< Message >& source )
		: messageQueue( source ) { }

	void ProcessMessage( const Message& message )
	{
		wcout << message.GetMessage() << endl;
	}

	virtual void run()
	{
		while( true )
		{
			Message message = receive( this->messageQueue );

			this->ProcessMessage( message );		
		}

		this->done();
	}
};

int main()
{
	// 메시지 큐
	unbounded_buffer< Message >	messageQueue;

	// 메시지를 발생하는 사용자와 메시지 펌프
	User user( messageQueue );
	MessagePump messagePump( messageQueue );

	// agent 시작.
	user.start();
	messagePump.start();	

	// agent 의 작업이 모두 끝날 때까지 대기
	agent* agents[] = { &user, &messagePump };
	agent::wait_for_all( 2, agents );
}

[ 코드1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 ]

 Message 클래스는 단순히 문자열을 래핑( wrapping ) 하는 클래스로 큐에 저장되는 메시지를 나타냅니다.

 agent 로 2개를 정의하였는데 하나는 사용자가 이벤트 메시지를 발생하는 것을 흉내 낸 User 클래스이고, 다른 하나는 메시지 펌프를 간략화한 MessagePump 클래스입니다.

 사용자 agent( User 객체 )는 약간의 시간차를 두고 이벤트 메시지를 발생합니다. 발생된 메시지는 메시지 큐에 저장됩니다.

 메시지 펌프 agent 는 메시지가 저장될 때까지 대기하다가 메시지가 저장되면 그 메시지를 받아서 처리합니다. 처리된 메시지는 메시지 큐에서 제거됩니다.

 agent 들의 start() 를 사용하여 작업을 시작하고, 모든 agent 의 작업이 끝날 때까지 기다립니다.

 실제로는 하나의 agent 가 무한 루프를 수행하므로 프로그램이 종료되지 않습니다.

 위의 예제처럼 데이터를 보내고, 순차적으로 받아서 처리하고 싶을 때, 내부적으로 큐가 필요할 때 유용한 message block 이 바로 unbounded_ buffer 입니다.

 멀티 스레드 프로그래밍 시 자료 구조 중 큐가 많이 사용되기 때문에 unbounded_buffer 도 많이 사용하게 될 것입니다.

 

[ 그림1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 결과 ]

[ 그림1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 결과 ]

 

마치는 글

 이번 글에서는 message block 중 가장 사용도가 높은 unbounded_buffer 에 대해서 알아보았습니다. unbounded_buffer 이외에도 다양한 message block 이 있습니다.

 다음 글에서는 overwrite_buffer 라는 message block 에 대해서 알아보도록 하겠습니다.

Asynchronous Agents Library

– message block 1. ( 인터페이스 )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이전 글까지 Asynchronous Agents Library( 이하, AAL ) 의 일부인 agent 와 message 전달 함수에 대해 알아보았습니다. agent 만 알아도 어느 정도 비 동기 처리를 쉽게 구현할 수 있습니다.

 이번 글에서는 agent 간 소통을 할 수 있는 message block 들에 대해서 알아보겠습니다. message block 을 이용하면 agent 간 데이터 또는 상태 동기화를 할 수 있습니다.

 AAL 은 스레드로부터 안전한 방식으로 구현되었고, 추상화되었습니다. 그래서 agent 객체와 message block 을 이용한 동기화 로직이 직관적이고 쉽게 흐름을 파악할 수 있어 데드락( dead-lock ) 을 방지하기 용이합니다.

 그럼 지금부터 agent 를 이용한 비 동기 처리에 날개를 달아주는 message block 에 대해 알아보도록 하겠습니다.

 

Message 객체

 예전 글부터 message, message 메커니즘, message 전달 함수, message block 등을 언급하면서 항상 message 란 개념을 사용했습니다.

 이 개념은 실제 클래스로 존재합니다. 하지만 단지 message 를 래핑( wrapping ) 할 뿐, 전혀 다른 기능을 가지고 있지 않은 클래스입니다.

 한 가지 기능이 있다면 식별자( id )를 갖는다는 것입니다. message 클래스는 Concurrency Runtime 의 _Runtime_object 클래스를 상속 받습니다. 이 클래스는 Runtime 에 의해 생성될 때 자동으로 id 를 갖게 됩니다. 이 id 를 알아보는 함수는 msg_id() 입니다. 이 메서드의 접근자가 public 으로 되어 있어 message 클래스에서도 사용 가능합니다.

 이 msg_id() 가 반환한 값은 message block 에서 사용되는 runtime_object_identity 형입니다. 몇몇 message block 메서드의 runtime_object_identity 형의 매개변수에 인자로 사용할 수 있습니다.

 사실, 직접 message block 을 구현하지 않는 한, message 클래스는 직접 사용할 경우는 없을 것입니다. 우리는 보내고 받는 데이터를 공급하면 내부적으로 그 데이터를 message 클래스로 래핑하고 message block 내부에서 사용하게 되는 것입니다. 그러므로 크게 신경쓰지 않아도 됩니다.

 

Source 와 target

 Message block 은 크게 두 가지 종류로 나눌 수 있습니다. 하나는 source 이고 다른 하나는 target 입니다.

 Message block 에서의 source 는 message 를 보낼 message block 을 일컫습니다. 마찬가지로 target 은 message 를 받을 message block 을 뜻합니다.

ISource 인터페이스

 Source 는 AAL 의 하나의 개념이지만, 이것을 인터페이스로 추상화 하였습니다. 이것이 ISource 인터페이스입니다.

 그러므로 source 로 쓰일 message block 들은 ISource 인터페이스를 상속하여 구현되었습니다. 만약 직접 source 로 사용될 message block 을 구현하신다면 ISource 인터페이스를 상속해야 합니다.

- ISource 인터페이스의 선언

template<   class _Type>class ISource;

[ 코드1. ISource 인터페이스의 선언 ]

 템플릿 매개변수인 _Type 은 message 로 쓰일 데이터 형( type )입니다. _Type 은 public typedef 인 source_type 으로 사용할 수 있습니다.

 

- ISource 인터페이스의 메서드

virtual void link_target(ITarget<_Type> * _PTarget) = 0;

[ 코드2. ISource::link_target() ]

 link_target() 은 target 인 message block 과 연결합니다. 여기서 연결의 의미는 자동으로 전달된다는 의미로 생각하시면 되겠습니다.

 즉, 이 ISource 를 상속받은 message block 에 link_target() 으로 target message block 을 연결했을 경우, 이 message block 의 message 들은 직접 전달 함수를 사용하지 않아도 자동으로 target message block 으로 전달됩니다.

 연결할 target 은 여러 개일 수 있습니다. 그러나 ISource 를 상속한 message block 의 구현에 따라 첫 번째 target 만 동작할 수도 있습니다. 예로 unbounded_buffer 가 있습니다. unbounded_buffer 는 내부적으로 큐를 구현하고 있어 전달 후, message 가 큐에서 제거되므로 두 번째 연결된 target 이 있더라도 message 를 보낼 수 없습니다.

 매개변수인 _PTarget 은 연결할 target message block 입니다. _PTarget 의 데이터 형인 ITarget 은 target 을 추상화한 인터페이스입니다. 곧 설명하도록 하겠습니다.

 _PTarget 이 NULL 이라면 invalid_argument 예외가 발생합니다.

 

virtual void unlink_target(ITarget<_Type> * _PTarget) = 0;

[ 코드3. ISource::unlink_target() ]

 unlink_target() 은 link_target() 으로 연결된 target 들 중 매개변수인 _PTarget 에 지정된 target 의 연결을 해제합니다.

 _PTarget 이 NULL 이라면 invalid_argument  예외가 발생합니다. 또한 _PTarget 이 연결된 target 들 중에 없다면 아무 것도 하지 않습니다.

 

virtual void unlink_targets() = 0;

[ 코드4. ISource::unlink_targets() ]

 unlink_targets() 는 연결된 모든 target 들의 연결을 해제합니다.

 

virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) = 0;

[ 코드5. ISource::accept() ]

 accept() 는 target 에서 호출되지만, source 가 제공합니다. source 의 message 를 수락하고, 소유권이 이전됩니다.

 매개변수인 runtime_object_identity 는 message 객체의 msg_id() 로 얻을 수 있습니다. 실제로 runtime_object_identity __int32 를 typedef 한 것이고, Concurrency Runtime 에서 객체를 생성할 때 지정되는 고유의 번호입니다.

 다른 매개변수인 _PTarget 은 message 를 수락하는 target 입니다.

 수락된 message 가 반환됩니다.

 

virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) = 0;

[ 코드6. ISource::reserve() ]

 reserve() 는 message 를 예약합니다. 예약을 성공한 message 는 message 를 얻기 위한 comsume() 이나 예약을 해제 위한 release() 를 호출해야 합니다.

 매개변수는 위의 accept() 와 같습니다.

 예약에 성공한 경우 true 를, 실패한 경우 false 를 반환합니다. 실패할 수 있는 이유는 다양합니다. 이미 예약되었거나, 구현한 message block 의 특징에 따라 예약에 실패할 수 있습니다.

 

virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) = 0;

[ 코드7. ISource::consume() ]

 consume() 은 위의 accept() 와 비슷합니다.

 다른 점이 있다면 reserve() 를 호출해 true 를 반환했을 때에만 consume() 을 호출해야 합니다. 보통 reserve() 를 호출하지 않았거나 _Ptarget 이 예약된 target 과 다를 경우 bad_target 예외가 발생합니다.

 매개변수는 위의 accept() 와 같습니다.

 

virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget) = 0;

[ 코드8. ISource::release() ]

 release() 는 예약된 것을 해제합니다.

 매개변수는 accept() 와 같습니다.

 

virtual void acquire_ref(ITarget<_Type> * _PTarget) = 0;

[ 코드9. ISource::acquire_ref() ]

 acquire_ref() 는 참조 개수를 증가시킵니다. 현재 link_target() 으로 연결된 target 에서 호출됩니다.

 매개변수인 _PTarget 은 link_target() 으로 연결된 target 입니다.

 

virtual void release_ref(ITarget<_Type> * _PTarget) = 0;

[ 코드10. ISource::release_ref() ]

 release_ref() 는 참조 개수를 감소시킵니다. 현재 link_target() 으로 연결된 target 에서 호출됩니다.

매개변수인 _PTarget 은 link_target() 으로 연결된 target 입니다.

 

ITarget 인터페이스

 Target 또한 source 와 마찬가지로 AAL 의 하나의 개념이지만, 이것을 추상화 하였습니다. 이것이 ITarget 인터페이스 입니다.

 Target 으로 사용할 message block 을 구현하신다면 ITarget 인터페이스를 상속해야 합니다.

- ITarget 인터페이스의 선언

template<   class _Type>class ITarget;

[ 코드11. ITarget 인터페이스의 선언 ]

 템플릿 매개변수인 _Type 은 message 로 사용될 데이터 형입니다. _Type 은 public typedef 인 type 으로 사용할 수 있습니다.

 Target 은 필터를 지정할 수 있습니다. 그래서 필터 함수의 시그니처( signature )인 bool ( _Type const & ) 를 typedef std::tr1::function<bool(_Type const&)> filter_method 로 정의되어 있습니다.

- ITarget 인터페이스의 메서드

virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource) = 0;

[ 코드12. ITarget::propagate() ]

 propagate() 는 지정된 source 로부터 해당 message 를 비 동기 방식으로 가져옵니다.

 매개변수인 _PMessage 는 가져올 message 이고, _PSource 는 보내는 message block 입니다. _PMessage 나 _PSource 가 NULL 일 경우, invalid_argument 예외를 발생할 수 있습니다.

 Message 의 전달이 성공 또는 실패 등의 message 상태를 반환합니다.

 

virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource) = 0;

[ 코드13. ITarget::send() ]

 send() 는 지정된 source 로부터 해당 message 를 동기 방식으로 가져옵니다.

 매개변수는 propagate() 와 같고, 예외 또한 같습니다.

 Message의 생성 이외에 네트워크와 함께 사용할 경우, 데드락( dead lock ) 을 초래할 수 있습니다.

 

virtual void link_source(ISource<_Source_type> * _PSource)

[ 코드14. ITarget::link_source() ]

 link_source() 는 source 의 link_target() 에 대응되는 함수로 지정된 source 를 연결합니다.

 하지만, 이 함수는 target 에서 호출하면 안되고, source 에서 link_target() 와 함께 호출하여 서로 연결되어야 합니다.

 

virtual void unlink_source(ISource<_Source_type> * _PSource)

[ 코드15. ITarget::unlink_target() ]

 unlink_source() 는 unlink_target() 에 대응되는 함수로 지정된 source 와의 연결을 해제합니다.

 하지만, link_source() 와 마찬가지로 target 에서 호출하면 안되고, source 에서 unlink_target()나 unlink_targets() 와 함께 호출하여 서로 연결을 해제해야 합니다.

 

virtual void unlink_sources()

[ 코드16. ITarget::unlink_sources() ]

 unlink_sources() 는 unlink_targets() 에 대응되는 함수로 지정된 source 들과의 연결을 모두 해제합니다.

 

마치는 글

 이번 글에서는 message block 구현 시, 상속해야 할 인터페이스인 ISourceITarget 에 대해서 알아보았습니다.

 실제로 ISourceITarget 의 메서드들을 직접 호출하는 경우는 거의 없으며, message block 내부에서 사용됩니다.

 Message 를 전달하기 위해서는 위 인터페이스들의 메서드들보다 message 전달 함수들을 많이 사용합니다.

 이번에 소개한 인터페이스들의 구현 클래스들에 대해 아직 소개하지 않았고, 이 인터페이스들의 메서드들이 내부적으로 사용되지만 사용자가 직접 호출할 경우가 드물기 때문에 예제를 작성하지 않았습니다.

 다음 글에서 위 인터페이스들을 구현한 구현 클래스들에 대해서 살펴보고 예제를 보도록 하겠습니다.