Asynchronous Agents Library
– message 전달 함수. 2 ( 수신 )

작성자: 임준환( mumbi at daum dot net )

 

Message 수신

 Message 를 message block 에 전송할 수 있듯이, message block 으로부터 수신할 수도 있습니다. message 수신 함수에도 전송 함수와 마찬가지로 동기 함수인 receive() 와 비 동기 함수인 try_receive() 가 있습니다.

 

동기 함수 receive()

 동기 함수인 receive() 는 message block 으로부터 수신이 완료될 때 수신된 message 를 반환합니다. 만약 message block 에 어떠한 message 도 없다면 receive() 는 message block 에 수신할 message 가 있을 때까지 기다립니다.

 아래는 receive() 의 선언입니다.

template <
   class _Type
>
_Type receive(
   ISource<_Type> * _Src,
   unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE
);

template <
   class _Type
>
_Type receive(
   ISource<_Type> * _Src,
   filter_method const& _Filter_proc,
   unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE
);

template <
   class _Type
>
_Type receive(
   ISource<_Type> &_Src,
   unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE
);

template <
   class _Type
>
_Type receive(
   ISource<_Type> &_Src,
   filter_method const& _Filter_proc,
   unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE
);

[ 코드1. receive() 의 선언 ] 

템플릿 매개변수인 _Type 은 message 의 자료 형입니다.

 함수 매개변수 중 _Src 는 message block 의 인터페이스 중 하나인 ISource 를 상속한 message block 객체이며, 이 객체로부터 message 를 수신합니다.

 함수 매개변수 중 _Timeout 은 최대 대기 시간입니다. 이것은 receive() 가 동기 함수이기 때문에 영원히 기다릴 상황을 대비하는 방법입니다. 이 매개변수를 지정했을 때, 최대 대기 시간을 초과하였을 경우, agent::wait()( Asynchronous Agents Library – agent. 2 ( 기능 ) 참고 ) 와 마찬가지로 operation_timed_out 예외를 발생합니다. 그러므로 이 매개변수를 지정 시 반드시 해당 예외를 처리해주어야 합니다. 기본 인자로 COOPERATIVE_TIME_INFINITE 가 지정되어 있으며, 무한히 기다리는 것을 의미합니다.

 함수 매개변수 중 _Filter_proc 는 message 를 거부할 수 있는 필터입니다. message block 생성자로 지정할 수 있는 필터와 마찬가지로 std::tr1::function<bool(_Type const&)> 입니다.

 Message 의 수신이 완료되면 해당 message 를 반환합니다.

예제

- 수신할 message 가 있는 경우

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< wstring > message_block;

	wstring send_message( L"first message!" );
	send( message_block, send_message );

	wstring receive_message;

	try
	{
		receive_message = receive( message_block, 2000 );
	}
	catch( operation_timed_out& e )
	{
		wcout << L"operation_timed_out exception" << e.what() << endl;
	}

	wcout << L"received message: " << receive_message << endl;
}

[ 코드2. receive() 의 수신할 message 가 있는 경우 예제 ]

 receive() 의 매개변수로 최대 대기 시간을 지정했지만 message block 에 message 가 존재하기 때문에 수신하고 바로 반환합니다.

[ 그림1. receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]

[ 그림1. receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]


- 수신할 message 가 없는 경우

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< wstring > message_block;

	// wstring send_message( L"first message!" );
	// send( message_block, send_message );

	wstring receive_message;

	try
	{
		receive_message = receive( message_block, 2000 );
	}
	catch( operation_timed_out& e )
	{
		wcout << L"operation_timed_out exception" << e.what() << endl;
	}

	wcout << L"received message: " << receive_message << endl;
}

[ 코드3. receive() 의 수신할 message 가 없는 경우 예제 ]

 send() 를 주석 처리하여 message block 에 전달한 message 가 없기 때문에 receive() 는 지정된 최대 대기 시간인 2초( 2000 milli second ) 동안 기다린 후, operation_timed_out 예외가 발생합니다.

 이 예외를 처리해야 정상적으로 프로그램이 진행됩니다.

 만약 최대 대기 시간을 지정하지 않았다면 무한 대기하게 됩니다.

[ 그림2. receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]

[ 그림2. receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]


 

비 동기 함수 try_receive()

비 동기 함수인 try_receive() 는 message 가 수신될 때까지 기다리지 않습니다. 만약 수신할 message block 에 어떠한 message 도 없다고 하더라도 기다리지 않고, 바로 반환됩니다.

아래는 try_receive() 의 선언입니다.

template <
   class _Type
>
bool try_receive(
   ISource<_Type> * _Src,
      _Type & _value
);

template <
   class _Type
>
bool try_receive(
   ISource<_Type> * _Src,
      _Type & _value,
   filter_method const& _Filter_proc
);

template <
   class _Type
>
bool try_receive(
   ISource<_Type> & _Src,
      _Type & _value
);

template <
   class _Type
>
bool try_receive(
   ISource<_Type> & _Src,
      _Type & _value,
   filter_method const& _Filter_proc
);

[ 코드4. try_receive() 의 선언 ]

 템플릿 매개변수인 _Type 은 receive() 와 마찬가지로 message 의 자료 형입니다.

 함수 매개변수 중 _Src 도 receive() 와 마찬가지로 message block 의 인터페이스 중 하나인 ISource 를 상속한 message block 객체이며, 이 객체로부터 message 를 수신합니다.

 함수 매개변수 중 _value 는 수신한 message 를 저장할 변수의 참조입니다. 수신이 성공하면 message 는 이 참조가 가리키는 변수에 저장됩니다.

 함수 매개변수 중 _Filter_proc 는 receive() 와 마찬가지로 message 를 거부할 수 있는 필터입니다.

 try_receive() 는 수신의 완료를 기다리지 않기 때문에 수신을 시도했을 때( try_receive() 를 호출했을 때 ) message block 에 어떠한 message 도 없다면 false 를 반환해 알려줍니다. message 가 있다면 true 를 반환합니다.

 만약, 수신 시도를 하자마자 시도한 컨텍스트가 계속 진행되기를 원한다면 receive() 에 _Timeout 매개변수에 0 을 지정하기 보다는 try_receive() 를 사용하는게 바람직합니다.

예제

- 수신할 message 가 있는 경우

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< wstring > message_block;

	send( message_block, wstring( L"first message!" ) );

	wstring received_message;
	if( try_receive( message_block, received_message ) )
	{
		wcout << L"receive success" << endl;
		wcout << L"received message: " << received_message << endl;
	}
	else
	{
		wcout << L"receive fail" << endl;
	}
}

[ 코드5. try_receive() 의 수신할 message 가 있는 경우 예제 ]

 try_receive() 는 비 동기 함수이기 때문에 수신할 message 가 있든 없든 먼저 반환됩니다. 수신할 message 가 있으면 true 를 반환하고 인자인 참조 변수에 수신한 message 를 저장합니다,

[ 그림3. try_receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]

[ 그림3. try_receive() 의 수신할 message 가 있는 경우 예제 실행 결과 ]


- 수신할 message 가 없는 경우

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< wstring > message_block;

	// send( message_block, wstring( L"first message!" ) );

	wstring received_message;
	if( try_receive( message_block, received_message ) )
	{
		wcout << L"receive success" << endl;
		wcout << L"received message: " << received_message << endl;
	}
	else
	{
		wcout << L"receive fail" << endl;
	}
}

[ 코드6. try_receive() 의 수신할 message 가 없는 경우 예제 ]

 동기 함수인 receive() 와는 달리 비 동기 함수인 try_receive() 는 수신할 message 가 없을 경우, 기다리지 않고 false 를 반환합니다.

 수신할 message 가 있든 없든 바로 반환해야 하는 경우라면 receive() 의 매개변수인 최대 대기 시간을 0 으로 지정하는 것보다는 try_receive() 를 권장합니다.

 receive() 의 최대 대기 시간은 예외 메커니즘을 사용하므로 try_receive() 에 비해 오버헤드가 있을 수 있습니다.

[ 그림4. try_receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]

[ 그림4. try_receive() 의 수신할 message 가 없는 경우 예제 실행 결과 ]


 

Message 필터

 지난 글에서 message block 에 필터를 지정할 수 있다고 언급했습니다. message block 에 지정되는 필터는 message block 에 전송 시에 적용됩니다.

 마찬가지로 수신 함수들에도 필터를 지정할 수 있습니다. message block 으로부터 수신 시에 적용됩니다.

 동기 함수인 receive() 는 수신할 message 가 필터에 의해 수락될 때까지 대기합니다. 즉, 수신할 message 가 필터에 의해 거부된다면 message block 에 message 가 없을 때와 같습니다.

 비 동기 함수인 try_receive() 또한 message block 에 message 가 없을 때와 마찬가지로 false 를 반환합니다.

 다시 말해, message block 에 message 가 없는 경우에도 필터에 의해 거부된다는 말과 같습니다.

 그럼, 필터를 이용한 수신 함수에 대한 예제를 살펴보겠습니다.

예제

#include <iostream>
#include <vector>
#include <iterator>
#include <functional>
#include <agents.h>
#include <ppl.h>

using namespace std;
using namespace std::tr1;
using namespace Concurrency;

class number_collector
	: public agent
{	
public:
	number_collector( ISource< int >& source, vector< int >& result, function< bool ( int ) > filter )
		: source( source )
		, result( result )
		, filter( filter ) { }
	
protected:
	void run()
	{
		while( true )
		{
			int number = receive( this->source, this->filter );

			if( 0 == number )
				break;

			this->result.push_back( number );
		}

		this->done();
	}

private:
	ISource< int >&				source;
	vector< int >&				result;
	function< bool ( int ) >	filter;
};

int main()
{
	// message block
	unbounded_buffer< int > message_block;

	// send number 1 ~ 10.
	parallel_for( 1, 11, [&]( int number )
	{
		send( message_block, number );
	} );

	// send stop signal.
	send( message_block, 0 );	// for even.
	send( message_block, 0 );	// for odd.

	vector< int > even_number_array, odd_number_array;

	number_collector even_number_collector( message_block, even_number_array, []( int number ) -> bool
	{
		return 0 == number % 2;
	} );

	number_collector odd_number_collector( message_block, odd_number_array, []( int number ) -> bool
	{
		if( 0 == number )
			return true;

		return 0 != number % 2;
	} );

	even_number_collector.start();
	odd_number_collector.start();

	// wait for all agents.
	agent* number_collectors[2] = { &even_number_collector, &odd_number_collector };
	agent::wait_for_all( 2, number_collectors );

	// print
	wcout << L"odd numbers: ";
	copy( odd_number_array.begin(), odd_number_array.end(), ostream_iterator< int, wchar_t >( wcout, L" " ) );

	wcout << endl << L"even numbers: ";
	copy( even_number_array.begin(), even_number_array.end(), ostream_iterator< int, wchar_t >( wcout, L" " ) );

	wcout << endl;
}

[ 코드7. 필터를 이용한 숫자 고르기 예제 ]

 우선 message block 에 1 ~ 10 의 정수를 전송합니다. parallel_for() 를 사용하였는데 이 함수는 Concurrency Runtime 위에서 AAL 과 작동하는 돌아가는 Parallel Patterns Library( 이하, PPL ) 에서 제공하는 함수입니다. PPL 에 대한 자세항 사항은 visual studio 팀 블로그에서 확인하실 수 있습니다.

 parallel_for() 는 반복될 내용을 병렬로 처리하기 때문에 성능에 도움을 줍니다. 그러나 반복되는 순서를 보장하지 않습니다.

 그래서 1 ~ 10 의 정수가 전송되는 순서는 알 수 없습니다. 하지만 1 ~ 10 의 정수를 모두 전송한 뒤, 0을 보내서 마지막 message 라는 것을 알려주었습니다. 두 번 보낸 0 중 하나는 짝수를 수신하는 agent 를 위한 것이고, 하나는 홀수를 수신하는 agent 를 위한 것입니다.

 사실, 이런 처리 로직을 구성할 때에는 상태 변화 알림에 유용한 다른 message block 을 사용하는 것이 좋지만 아직 message block 에 대해서 설명하지 않았기 때문에 혼란을 줄이기 위해 간단한 unbounded_buffer 하나만으로 처리하였습니다.

 위 코드에 정의된 agent 인 number_collector 는 message block 으로부터 필터에 의해 필터링된 message 를 컨테이너에 저장합니다.

 동기 함수인 receive() 를 사용했기 때문에 원하는 message 가 올 때까지 기다립니다. 이로 인해 필요한 만큼의 최소의 반복을 하여 오버헤드가 줄어 듭니다.

 만약 비 동기 함수인 try_receive() 를 사용했다면 쓸모 없는 반복 오버헤드를 발생시킬 것입니다. 이 예제의 경우에는 동기 함수인 receive() 가 적합합니다.

 정의된 agent 를 짝수용과 홀수용을 선언하고 start() 를 사용하여 작업을 시작합니다. 그리고 wait_for_all() 을 사용하여 두 agent 가 모두 끝날 때까지 기다린 후, 모든 작업이 종료되면 화면에 수집한 정수들을 출력합니다.

  위 예제 코드는 Visual studio 2008 부터 지원하는 tr1function 과 visual studio 2010 부터 지원하는 C++0x 의 람다를 사용하였습니다. Concurrency Runtimetr1, C++0x 등의 visual studio 2010 의 새로운 feature 들을 사용하여 구현되었기 때문에 이것들에 대해 알아두는 것이 좋습니다.

[ 그림5. 필터를 이용한 숫자 고르기 예제 실행 결과 ]

[ 그림5. 필터를 이용한 숫자 고르기 예제 실행 결과 ]


 

마치는 글

 이 글에서는 message 전달 함수 중 수신 함수인 receive() 와 try_receive() 에 대해서 알아보았습니다.

 receive() 와 try_receive() 는 사용해야 할 상황이 분명히 다르니 상황에 따라 사용에 유의해야 합니다.

 다음 글에서는 message 가 저장되는 message block 에 대해서 알아보도록 하겠습니다.

Asynchronous Agents Library
– message 전달 함수. 1 ( 전송 )

작성자: 임준환( mumbi at daum dot net )

 

Message 메커니즘

 Asynchronous Agents Library( 이하, AAL ) 에서 message 메커니즘이란 agent 들 간의 데이터 교환이나 동기화 등 상호 작용을 위해 사용되는 기능입니다.

 Message 메커니즘은 크게 message 를 보내고( send ) 받는( receive ) 전달 함수( passing function )와 message 들을 관리하거나 message 에 특별한 기능을 부여하는 message block 으로 구성되어 있습니다.

 실질적으로 message block 이 다루는 message 란 빌트인 자료 형( built-in type )이나 클래스와 같은 사용자 정의 자료 형( user define type ) 의 데이터입니다.

 이 글에서는 먼저 message 를 주고 받는 전달 함수 중 전송 함수에 대해서 알아보겠습니다.

 

Message 전송

 Message 전달 함수 중 보내는 기능을 하는 함수에는 동기 전송 함수 send() 와 비 동기 전송 함수 asend(), 이렇게 두 가지가 있습니다.

 동기와 비 동기라는 용어가 혼란스러울 수 있기 때문에 잠깐 언급하고 넘어가겠습니다.

 여기서 쓰이는 동기( synchronous )라는 용어는 병렬 처리에서 쓰이는 동기화( synchronization )라는 용어와는 약간은 다른 개념입니다.

 동기화는 다른 두 시간을 하나로 일치시킨다는 뜻으로 행위를 말합니다. 반면 동기는 이미 동기화되었다는 뜻으로 상태를 뜻합니다. 마찬가지로 비 동기는 동기화되지 않았다는 뜻입니다.

 즉, 보통 프로그래밍에서 동기 함수란 그 함수가 호출되고, 그 함수가 반환될 때까지 해당 컨텍스트가 진행되지 않고 기다리다가 반환되고 나서야 컨텍스트가 진행되는 함수를 말합니다. 이것은 사실 컨텍스트가 기다리는 것이 아니라, 해당 컨텍스트가 함수의 내용을 직접 처리하기 때문에 함수를 호출한 입장에서 보면 기다리는 것처럼 보이는 것입니다.

 마찬가지로 비 동기 함수는 함수를 호출한 컨텍스트가 직접 함수의 내용을 처리하지 않고 새로운 작업 스레드를 생성하고 생성된 스레드의 컨텍스트가 진행되기 때문에 함수를 호출한 컨텍스트는 함수를 호출하자마자 함수의 반환을 받고, 계속해서 진행되는 것입니다. 함수를 호출한 컨텍스트는 이러한 비 동기 함수가 언제 실제로 종료될지 모르기 때문에 함수의 반환이 아닌 다른 기법이 필요합니다. 보통 폴링( polling )이나 메시지 또는 콜백 함수와 같은 기법을 사용하여 함수의 종료를 알 수 있습니다.

 그럼 이제 본격적으로 두 message 전달 함수에 대해서 알아 보겠습니다.

 

동기 전송 함수 send()

 앞에서 설명한 것처럼 send() 는 동기 함수이기 때문에 message 가 전송에 대한 결과가 확실해 졌을 때 반환됩니다. 즉, 전송된 결과가 확실할 때까지 기다린다는 뜻입니다.

  send() 의 선언은 다음과 같습니다.

template <
   class _Type
>
bool send(
   ITarget<_Type> * _Trg,
   const _Type& _Data
);

template <
   class _Type
>
bool send(
   ITarget<_Type> &_Trg,
   const _Type &_Data
);

[ 코드1. send() 의 선언 ]

 템플릿 매개변수인 _Type 은 전송할 message 의 자료 형입니다.

 함수 매개변수 중 _Trg 는 message block 의 인터페이스 중 하나인 ITarget 을 상속받은 message block 객체이며, 전송될 message 를 받게 됩니다. 나중에 message block 에 대해서 자세히 언급할 예정입니다

 또 다른 함수 매개변수인 _Data 가 바로 전송할 message 입니다.

 send() 가 message 전송에 성공했으면 true 를, 그렇지 않으면 false 를 반환합니다.

예제

#include <agents.h>

using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< int > message_block;

	send( message_block, 1 );	
}

[ 코드2. send() 예제 ]

 아직 message block 에 대해서 설명하지 않았지만 예제를 위해 message block 중 하나인 unbounded_buffer 를 사용하였습니다.

 message_block 에 1 을 message 로 전송하는 코드입니다. 아래의 캡처 그림을 통해 message 가 message block 에 전송된 것을 확인할 수 있을 것입니다.

 아직 수신 함수를 설명하지 않았기 때문에 코드의 실행 결과가 아닌 코드의 디버깅 화면을 캡처한 그림으로 대신하겠습니다.

[ 그림1. send() 예제 디버깅 화면 ]


 

비 동기 함수 asend()

 asend() 는 비 동기 함수입니다. 즉, 전송이 완료되기 전에 반환됩니다.

 send() 는 전송 결과를 반환하는 반면, asend() 는 전송 결과가 아닌 message 를 받는 message block 이 전송을 수락했는지 아닌지를 반환합니다.

 아래는 asend() 의 선언입니다.

template <
   class _Type
>
bool asend(
   ITarget<_Type> * _Trg,
   const _Type& _Data
);

template <
   class _Type
>
bool asend(
   ITarget<_Type> &_Trg,
   const _Type &_Data
);

[ 코드3. asend() 의 선언 ]

 템플릿 매개변수와 함수 매개변수는 모두 send() 와 같습니다.

예제

#include <agents.h>

using namespace Concurrency;

int main()
{
	// message block
	unbounded_buffer< int > message_block;

	asend( message_block, 1 );

	Concurrency::wait( 10 );
}

[ 코드4. asend() 예제 ]

 asend() 가 반환되었을 때에는 아직 message block 에 message 가 전송되지 않았습니다. 이것으로 asend() 가 비 동기 함수임을 확인할 수 있습니다.

 약간의 시간( 10 milli second ) 이 지난 후에는 message block 에 message 가 전송된 것을 확인할 수 있습니다.

[ 그림2. asend() 예제 디버깅 화면 - 호출 직 후 ]


[ 그림3. asend() 예제 디버깅 화면 - 약간의 시간이 지난 후 ]


 

Message 필터

 Message 전송 함수인 send() 의 반환 값이 전송 결과라고 하였고, 실패할 경우 false 를 반환한다고 하였습니다. 사실, 실패할 경우란 message 를 받는 message block 이 전송을 거부할 경우, 즉 필터링되었을 경우입니다.

 결론적으로 send() 와 asend() 의 반환 값은 모두 message block 의 수락 또는 거절 여부입니다.

 여기서 집고 넘어가야 할 부분이 언제 전송이 거부되는 것인가 하는 것입니다.

 message block 은 두 가지 경우에 message 전송을 거부합니다.

 첫째는 message block 이 파괴되어 소멸자가 처리되고 있을 때입니다. 당연한 상황입니다.

 둘째는 message block 의 필터에 의해 message 가 거부당했을 때입니다. 모든 message block 의 생성자 중에는 filter_method 형의 매개변수를 갖는 생성자가 있습니다. filter_method 형은 사실 std::tr1::function<bool(_Type const&)> 입니다. message block 을 생성하는 클라이언트는 임의의 message 필터를 적용할 수 있습니다. 이 필터 함수가 false 를 반환할 경우, message 전송은 거부됩니다.

예제

#include <iostream>
#include <agents.h>

using namespace std;
using namespace Concurrency;

int main()
{
	// 필터( 짝수만 수락 )가 적용된 message block
	unbounded_buffer< int > buffer( []( int message ) -> bool
	{
		return 0 == ( message % 2 );

	} );

	wcout << L"send 1: " << ( send( buffer, 1 ) ? L"accepted" : L"declined" ) << endl;	// 거부
	wcout << L"send 2: " << ( send( buffer, 2 ) ? L"accepted" : L"declined" ) << endl;	// 수락

	wcout << L"asend 3: " << ( asend( buffer, 3 ) ? L"accepted" : L"declined" ) << endl;	// 거부
	wcout << L"asend 4: " << ( asend( buffer, 4 ) ? L"accepted" : L"declined" ) << endl;	// 수락
}

[ 코드5. 전송 거부 예제 ]

 이 예제에는 message block 의 필터로 Visual studio 2010 에서 지원하는 C++0x 의 람다를 사용하였습니다. 람다는 이 글의 논제에서 벗어나기 때문에 설명하지 않도록 하겠습니다. Visual studio 팀 블로그에서 람다에 대한 정보를 얻을 수 있습니다.

 간단히 람다에 대해서 설명하고 넘어가자면 익명의 함수 객체라고 보셔도 될 것입니다. 

 예제에 사용된 message block 의 필터는 짝수만 수락하는 필터입니다. 그래서 실행 결과로 send() 와 asend() 모두 홀수는 거부되었고, 짝수는 수락되는 것을 볼 수 있습니다.

[ 그림4. 전송 거부 예제 실행 결과 ]


 

마치는 글

 이 글에서는 message 전달 함수 중 전송 함수들에 대해서 알아보았습니다.

 이 함수들 중 어떤 것을 사용하는 것이 적절한지를 판단하기 위해서는 반드시 동기와 비 동기에 대한 개념의 이해가 필요합니다.

 상황에 따라 적절한 함수를 사용하시면 원하는 결과를 얻을 수 있을 것입니다.

 전송 함수들에 대해서 알아보았지만 아직 수신 함수들에 대해 알아보지 않았습니다. 다음 글에서는 message block 으로부터 message 를 수신하는 수신 함수들에 대해 알아보겠습니다.