Asynchronous Agents Library
– message block 2. ( unbounded_buffer )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 이전 글에서 message block 의 인터페이스인 ISourceITarget 인터페이스에 대해서 알아보았습니다. 이번 글부터 그 인터페이스들을 상속받아 구현한 message block 에 대해 알아보겠습니다.

 Message block 은 버퍼( buffer ) 를 가질 수도 있고, 상태만 가질 수도 있고, 기능만 가질 수도 있습니다. 그러므로 각 message block 들의 특징을 잘 파악하고, 언제 필요한지 알아야 합니다.

 이번 글에서는 가장 범용적인 유용한 unbounded_buffer 에 대해서 알아보도록 하겠습니다.

 

unbounded_buffer< _Type >

 unbounded_buffer 는 message block 중 가장 많이 사용될 것입니다. unbounded_buffer 는 내부적으로 큐( queue )를 구현하고 있어 message 저장소 역할을 합니다. 누군가 unbounded_buffer 에 message 를 보내면 unbounded_buffer 에 순서대로 차곡차곡 쌓이고, 쌓인 순서대로 꺼내서 쓸 수 있습니다. 꺼낸 message 는 unbounded_buffer 에서 제거됩니다. 그렇기 때문에 여러 곳에서 같은 message 를 꺼내 받을 수 없습니다.

 이런 작업들은 비 동기 agent 들과 사용할 때, 빛을 발합니다.

 unbounded_buffer 는 스레드에 안전하므로 직접 lock 을 하지 않아도 됩니다.

 

생성자

 

unbounded_buffer() – 기본 생성자

 빈 unbounded_buffer 를 생성합니다.

 

unbounded_buffer( filter_method const& _Filter )

 빈 unbounded_buffer 를 생성합니다. 하지만 필터 함수를 지정하여 받을 수 있는 메시지를 거를 수 있습니다.

 이 필터 함수는 bool (_Type const &) 형의 시그니처( signature )를 갖습니다.

 

멤버 함수

 

bool enqueue( _Type const& _Item )

 하나의 message 를 unbounded_buffer 에 보냅니다.

 message 전송이 성공이면 true, 아니면 false 를 반환합니다.

 내부적으로 이 함수는 message 전달 함수인 send() 를 사용합니다. 그리고 send() 의 결과를 반환합니다.

 

_Type dequeue()

 unbounded_buffer 에서 하나의 message 를 꺼냅니다. 꺼낸 message 는 큐에서 제거됩니다.

 꺼내진 message 를 반환합니다. 그리고 꺼내진 message 는 unbounded_buffer 내부에서 제거됩니다.

 enqueue() 와 마찬가지로 내부적으로 message 전달 함수인 receive() 를 사용합니다. receive() 가 반환한 값을 반환합니다.

 

예제

 unbounded_buffer 를 사용하여 작은 시나리오를 구현해보도록 하겠습니다.

시나리오

 윈도우즈 OS 는 사용자의 이벤트들을 메시지 큐에 담고, 큐에 들어온 메시지들을 순차적으로 꺼내서 처리하는 메커니즘을 사용합니다. 이 시나리오를 agent 와 unbounded_buffer 를 이용하여 간단하게 구현해보겠습니다.

코드

#include <iostream>
#include <string>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 메시지 객체
class Message
{
	wstring		message;

public:
	Message( const wstring& message )
		: message( message ) { }

	const wstring& GetMessage() const
	{
		return this->message;
	}
};

// 메시지를 발생하는 사용자 agent
class User
	: public agent
{
	ITarget< Message >&	messageQueue;

public:
	User( ITarget< Message >& target )
		: messageQueue( target ) { }

	void ClickMouseLButton()
	{
		send( this->messageQueue, Message( L"WM_LBUTTONDOWN" ) );
		send( this->messageQueue, Message( L"WM_LBUTTONUP" ) );		
	}

	void DragMouseLButton()
	{
		send( this->messageQueue, Message( L"WM_LBUTTONDOWN" ) );
		send( this->messageQueue, Message( L"WM_MOUSEMOVE" ) );
		send( this->messageQueue, Message( L"WM_LBUTTONUP" ) );
	}

	virtual void run()
	{
		this->ClickMouseLButton();
		
		Concurrency::wait( 1000 );

		this->DragMouseLButton();

		this->done();
	}
};

// 발생한 메시지들을 처리하는 메시지 펌프 agent
class MessagePump
	: public agent
{
	ISource< Message >&	messageQueue;

public:
	MessagePump( ISource< Message >& source )
		: messageQueue( source ) { }

	void ProcessMessage( const Message& message )
	{
		wcout << message.GetMessage() << endl;
	}

	virtual void run()
	{
		while( true )
		{
			Message message = receive( this->messageQueue );

			this->ProcessMessage( message );		
		}

		this->done();
	}
};

int main()
{
	// 메시지 큐
	unbounded_buffer< Message >	messageQueue;

	// 메시지를 발생하는 사용자와 메시지 펌프
	User user( messageQueue );
	MessagePump messagePump( messageQueue );

	// agent 시작.
	user.start();
	messagePump.start();	

	// agent 의 작업이 모두 끝날 때까지 대기
	agent* agents[] = { &user, &messagePump };
	agent::wait_for_all( 2, agents );
}

[ 코드1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 ]

 Message 클래스는 단순히 문자열을 래핑( wrapping ) 하는 클래스로 큐에 저장되는 메시지를 나타냅니다.

 agent 로 2개를 정의하였는데 하나는 사용자가 이벤트 메시지를 발생하는 것을 흉내 낸 User 클래스이고, 다른 하나는 메시지 펌프를 간략화한 MessagePump 클래스입니다.

 사용자 agent( User 객체 )는 약간의 시간차를 두고 이벤트 메시지를 발생합니다. 발생된 메시지는 메시지 큐에 저장됩니다.

 메시지 펌프 agent 는 메시지가 저장될 때까지 대기하다가 메시지가 저장되면 그 메시지를 받아서 처리합니다. 처리된 메시지는 메시지 큐에서 제거됩니다.

 agent 들의 start() 를 사용하여 작업을 시작하고, 모든 agent 의 작업이 끝날 때까지 기다립니다.

 실제로는 하나의 agent 가 무한 루프를 수행하므로 프로그램이 종료되지 않습니다.

 위의 예제처럼 데이터를 보내고, 순차적으로 받아서 처리하고 싶을 때, 내부적으로 큐가 필요할 때 유용한 message block 이 바로 unbounded_ buffer 입니다.

 멀티 스레드 프로그래밍 시 자료 구조 중 큐가 많이 사용되기 때문에 unbounded_buffer 도 많이 사용하게 될 것입니다.

 

[ 그림1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 결과 ]

[ 그림1. agent 와 unbounded_buffer 를 이용한 메시지 펌프 간략 구현 결과 ]

 

마치는 글

 이번 글에서는 message block 중 가장 사용도가 높은 unbounded_buffer 에 대해서 알아보았습니다. unbounded_buffer 이외에도 다양한 message block 이 있습니다.

 다음 글에서는 overwrite_buffer 라는 message block 에 대해서 알아보도록 하겠습니다.