Asynchronous Agents Library
– message block 3. ( overwrite_buffer & single_assignment )

작성자: 임준환( mumbi at daum dot net )

 

시작하는 글

 지난 글 unbounded_buffer 에 이어 또 다른 message block 인 overwrite_buffersingle_assignment 에 대해서 알아보도록 하겠습니다.

 Message block 들은 특징들이 모두 다르기 때문에 지난 unbounded_buffer 을 생각하시면 이해가 어려울 수 있습니다. message block 하나 하나의 쓰임새가 다르므로 새로운 것을 알아본다고 생각하시는 것이 좋을 것 같습니다.

 

overwrite_buffer< _Type >

 지금부터 설명드릴 overwrite_bufferunbounded_buffer 와는 달리 하나의 변수라고 생각하시면 이해하기 쉬울 것입니다.

 Concurrency runtime 을 사용하지 않고, 스레드 간의 상태나 정보를 공유하려면 전역 변수나 힙( heap ) 에 할당된 변수에 락( lock ) 을 걸어 사용해야 합니다.

 overwrite_buffer 는 방금 언급한 번거로운 작업들을 알아서 해줍니다. 내부에서 힙에 메모리를 할당하고, 접근 시 락을 겁니다. 하지만 사용하는 우리는 그런 것들을 신경 쓰지 않고 마치 지역 변수처럼 사용할 수 있습니다.

 unbounded_buffer 는 외부에서 message 를 받아가면 내부에서 해당 message 가 제거되는 반면에, overwrite_buffer 는 제거되지 않습니다. 또한 하나의 변수와도 같기 때문에 외부에서 message 를 보내면 이 전의 message 를 덮어쓰고 새 message 가 저장됩니다.

 결국 overwrite_buffer 는 단 하나의 message 만을 갖게 됩니다.

 그럼 overwrite_buffer 의 멤버 함수에 대해서 알아보도록 하겠습니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 멤버 함수들입니다.

 

bool has_value() const

 현재 message 를 가지고 있는지 반환합니다.

 어떠한 message 도 갖지 않을 경우에 false 를 반환합니다. 만약, 한번이라도 overwrite_buffer 에 message 가 전달된다면 그 후부터는 true 를 반환합니다. overwrite_buffer 는 외부에서 message 를 받아가도 내부의 message 가 제거되지 않기 때문입니다.

 Message 를 갖고 있지 않을 때, 외부에서 동기 함수인 receive() 를 사용해 message 를 얻기를 원한다면 overwrite_buffer 에 message 가 들어올 때까지 기다립니다. message 가 제거되지 않기 때문에 한번이라도 overwrite_buffer 가 message 를 받으면 receive() 가 기다리는 일은 없을 것입니다.

 

_Type value();

 현재 가지고 있는지 message 를 반환합니다.

 내부적으로 동기 전달 함수인 receive() 를 사용하므로 message 를 가지고 있지 않다면 message 를 갖게 될 때까지 기다립니다. 만약 이 때, has_value() 를 호출했다면 false 를 반환할 것입니다.

 Message 가 제거되지 않기 때문에 전달 함수를 이용해 message 를 받아갈 경우, 복사본이 전달됩니다.

 

예제

 overwrite_buffer 의 간단한 예제를 구현해보도록 하겠습니다.

 

시나리오

 네트워크 지연 시간을 갱신하고, 출력하는 프로그램을 작성할 것입니다.

 네트워크 지연 시간을 갱신하는 역할을 하는 agent 와 갱신된 정보를 출력하는 agent 가 하나의 overwrite_buffer 를 공유하여 사용하는 예제입니다.

 

코드

#include <iostream>
#include <array>
#include <agents.h>

using namespace std;
using namespace Concurrency;

// 지연 시간을 얻어오는 agent.
class PingUpdater
	: public agent
{
public:
	PingUpdater( const array< unsigned int, 5 >& delayTimeSource, ITarget< unsigned int >& targetBlock )
		: delayTimeSource( delayTimeSource )
		, targetBlock( targetBlock ) { }

protected:
	// 2초마다 지연 시간을 얻어 옴.
	void run()
	{
		while( true )
		{
			asend( this->targetBlock, this->GetDelayTime() );

			Concurrency::wait( 2000 );
		}

		this->done();
	}

	// 지연 시간을 시뮬레이션하는 함수.
	unsigned int GetDelayTime()
	{
		static unsigned int index = 0;

		unsigned int delayTime = this->delayTimeSource[ index ];

		if( index + 1 < this->delayTimeSource.size() )
			++index;		
		else
			index = 0;

		return delayTime;
	}

private:
	const array< unsigned int, 5 >&	delayTimeSource;
	ITarget< unsigned int >&		targetBlock;
};

// 지연 시간을 출력하는 agent.
class PingDisplayer
	: public agent
{
public:
	PingDisplayer( ISource< unsigned int >& sourceBlock )
		: sourceBlock( sourceBlock ) { }
protected:
	// 1초마다 지연 시간을 출력한다.
	void run()
	{
		while( true )
		{
			this->Display( receive( this->sourceBlock ) );

			Concurrency::wait( 1000 );
		}

		this->done();
	}

	// 지연 시간을 출력하는 함수.
	void Display( unsigned int delayTime )
	{
		wcout << L"current delay time: " << delayTime << endl;
	}

private:
	ISource< unsigned int >&	sourceBlock;
};

int main()
{
	// 네트워크 지연 시간의 시뮬레이션 정보.
	array< unsigned int, 5 > delayTimeSource = { 210, 211, 261, 246, 223 };

	// 공유 버퍼
	overwrite_buffer< unsigned int > delayTimeBuffer;

	// 네트워크 지연 시간을 갱신하는 agent 와 출력하는 agent.
	PingUpdater updater( delayTimeSource, delayTimeBuffer );
	PingDisplayer displayer( delayTimeBuffer );

	// agent 시작.
	updater.start();
	displayer.start();

	// agent 의 작업이 모두 끝날 때까지 대기.
	agent* waitingAgents[2] = { &updater, &displayer };
	agent::wait_for_all( 2, waitingAgents );
}

[ 코드1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

 PingUpdater 클래스는 agent 클래스로 네트워크 지연 시간을 갱신하는 역할을 합니다. 2초에 한 번씩 시뮬레이션을 위해 준비된 정보를 순회하며 얻어와서 overwrite_buffer 에 전달합니다.

 PingDisplayer 클래스도 agent 클래스로 갱신된 네트워크 지연 시간을 화면에 출력하는 역할을 합니다. 1초에 한번씩 overwrite_buffer 로부터 갱신된 정보를 가져와서 화면에 출력합니다.

 예제에서 사용된 Concurrency::wait() 는 Win32 API 의 Sleep() 과 같은 역할을 합니다. agent::wait() 과 혼동하지 않길 바랍니다.

 위 코드를 보시면 굉장히 직관적이고, 간단하게 멀티 스레드 프로그래밍을 할 수 있다는 것을 알 수 있을 것입니다.

 

[ 그림1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

[ 그림1. overwrite_buffer 를 이용한 네트워크 지연 시간 갱신 및 출력 예제 ]

 

single_assignment< _Type >

 single_assignment 는 위에서 설명한 overwrite_buffer 와 거의 흡사합니다.

 단지 다른 점이 있다면 message 를 한번만 받을 수 있다는 것입니다. 만약 두 번 이상 보낸 다면 두 번째부터는 무시됩니다.

 멤버 함수 또한 거의 같지만, 다른 점을 알아보겠습니다.

 

멤버 함수

 생성자와 소멸자를 제외한 public 인 함수들입니다.

 

bool has_value() const

 위에서 설명한 overwrite_bufferhas_value() 와 같습니다.

 Message 를 단 한번도 받지 않았다면 false 를 반환하고, 받았다면 true 를 반환합니다.

 

_Type const & value()

 overwrite_buffervalue() 와 같은 기능을 합니다.

 하지만 값을 반환하지 않고 const 참조를 반환한다는 것이 다릅니다.

 overwrite_buffervalue() 와 마찬가지로 message 를 갖고 있지 않다면 message 를 갖게 될 때까지 기다립니다.

 

마치는 글

 이번 글에서는 overwrite_buffersingle_assignment 에 대해서 알아보았습니다.

  single_assignment 는 overwrite_buffer 와 거의 흡사하기 때문에 예제는 생략하였습니다.

 지난 글에서 본 unbounded_buffer 와는 분명히 쓰임새가 다르므로 특징을 잘 파악해두시면 좋을 것입니다.

 다음 글에서 또 다른 message block 을 소개해드릴 것입니다. 그 message block 또한 쓰임새가 분명히 다르므로 Asynchronous Agents Library 의 활용도가 굉장히 넓다는 것을 아시게 될 것입니다.

이 글은 MSDN 글, "Solving The Dining Philosophers Problem With Asynchronous Agents"를 참고하여 작성되었습니다.

Asynchronous Agents Library로 Dining Philosophers 문제 해결하기 - 1

자, 이제 본격적으로 코드를 살펴보기 전에 메시지 블록이 무엇인지 먼저 짚고 넘어가겠습니다. AAL액터모형을 사용한다고 말씀드렸습니다. 또한, 액터모형에서 액터들은 메시지만으로 통신한다고 말씀드렸죠. 이 때 메시지를 받는 대상 혹은 메시지의 출처의 역할을 하는 것이 메시지 블록입니다. 전자의 경우 목적(target) 블록이라 하고, 후자는 원천(source) 블록이 됩니다.

전회에서 이번 예제에 쓰이는 네가지 메시지 블록을 소개했었는데요. unbounded_buffer는 목적 및 원천으로 쓰이며 큐와 같이 여럿의 메시지를 담고 있을 수 있는 놈입니다. overwrite_buffer는 하나의 변수처럼 값 하나만을 지니며, 새로 메시지가 올 경우 기존 값은 덮어씌여집니다. 역시 원천으로도 쓰일 수 있으며, 이 경우 사본을 보냅니다. 반면, call목적 블록으로만 쓰여 메시지 도착 시 특정 함수개체를 불러주는 기능을 합니다. join은 이번 예제에서 핵심 역할을 하는 블록으로서 여러 메시지를 동시에 기다려 하나로 묶어 출력하는 기능을 합니다.

먼저 가장 간단한 Chopstick 클래스를 살펴보죠.

   22 class Chopstick{

   23     const std::string m_Id;

   24 public:

   25     Chopstick(std::string && Id):m_Id(Id){};

   26     const std::string GetID()

   27     {

   28         return m_Id;

   29     };

   30 };


이와 같이 젓가락 식별용의 문자열을 가질뿐입니다. 생성자에서 r-value 참조를 쓰고 있다는 것 정도가 주목할만한 사항이겠군요.

다음은 ChopstickProvider로 다음과 같이 단순히 typedef입니다.

   34 typedef Concurrency::unbounded_buffer<Chopstick*> ChopstickProvider;


unbounded_buffer 메시지 블록을 이용해 메시지로 젓가락을 받으면 담고 있다가 철학자의 요청이 있으면 제공하는 역할을 합니다. 물록 철학자가 한입 먹고 나선 다시 젓가락을 놓으면 다시 받아놓는 역할도 합니다. 이 예제에서는 unbounded_buffer의 개수무제한(unbounded) 특성이 사실 굳이 필요 없습니다만 그래도 unbounded_buffer의 move semantic이 필요하기에(이 점에서 사본을 보내는 overwrite_buffer와는 다르죠) 이를 쓰는 것입니다.

다음이 대망의 Philosopher 클래스가 되겠습니다. 먼저, Concurrency::agent에서 public 상속을 받고 있는 것을 확인할 수 있습니다. 말씀드린 것처럼 각 철학자가 액터가 되어 독립적으로 동작하기 (즉, 별도 스레드로) 위함입니다.

   35 class Philosopher : public Concurrency::agent

   36 {

   37     ChopstickProvider* m_LeftChopstickProvider;

   38     ChopstickProvider* m_RightChopstickProvider;

   39 

   40 public:

   41     const std::string m_Name;

   42     const size_t  m_Bites;

   43     Philosopher(const std::string&& name, size_t bites=10):m_Name(name),m_Bites(bites){};

   44     Concurrency::unbounded_buffer<ChopstickProvider*> LeftChopstickProviderBuffer;

   45     Concurrency::unbounded_buffer<ChopstickProvider*> RightChopstickProviderBuffer;

   46     Concurrency::overwrite_buffer<PhilosopherState> CurrentState;

   47     void run()

   48     {

   49 

   50         //run에서 제일 먼저 해야하는 것은 ChopstickProvider를 초기화하는 것입니다. 여기서는 receive를 통해 public 변수에 메시지가 도착하기를 기다리게 하는 방식으로 처리합니다:

   51 

   52         //ChopstickProvider들을 초기화합니다.

   53         m_LeftChopstickProvider  = Concurrency::receive(LeftChopstickProviderBuffer);

   54         m_RightChopstickProvider = Concurrency::receive(RightChopstickProviderBuffer);

   55 

   56         //이제 생각하다가 먹기를 반복해야 합니다. 그를 위해 아직 등장하지 않은 두 함수(PickupChopsticks과 PutDownChopsticks)를 이용하려고 합니다:

   57 

   58         for(size_t i = 0; i < m_Bites;++i)

   59         {

   60             Think();

   61             std::vector<Chopstick*> chopsticks(PickupChopsticks());

   62             Eat();

   63             PutDownChopsticks(chopsticks);

   64         }

   65 

   66         //남은 일은 run 메소드를 나가기 전에 정리 작업을 하는 것인데, 다른 곳에 쓰일 수 있도록 ChopstickProvider를 반환하고 에이전트의 상태를 완료로 설정하고 있습니다.

   67         Concurrency::send(LeftChopstickProviderBufferm_LeftChopstickProvider);

   68         Concurrency::send(RightChopstickProviderBuffer, m_RightChopstickProvider);

   69 

   70         this->done(Concurrency::agent_done);

   71     }

   72 

   73     std::vector<Chopstick*> PickupChopsticks()

   74     {

   75         //join 생성

   76         Concurrency::join<Chopstick*,Concurrency::non_greedy> j(2);

   77         m_LeftChopstickProvider->link_target(&j);

   78         m_RightChopstickProvider->link_target(&j);

   79 

   80         //젓가락을 듭니다.

   81         return Concurrency::receive (j);

   82     } 

   83     void PutDownChopsticks(std::vector<Chopstick*>& v)

   84     {

   85         Concurrency::asend(m_LeftChopstickProvider,v[0]);

   86         Concurrency::asend(m_RightChopstickProvider,v[1]);

   87     }

   88 private:

   89     void Eat()

   90     {

   91         send(&CurrentState,Eating);

   92         RandomSpin();

   93     };

   94     void Think()

   95     {

   96         send(&CurrentState,Thinking);

   97         RandomSpin();

   98     };

   99 };


그 다음으로 한쌍의 젓가락을 위한 두 ChopstickProvider 포인터 변수(m_LeftChopstickProvider, m_RightChopstickProvider)가 보입니다. 철학자 이름(m_Name)과 몇번 먹을지를 나타내는 변수(m_Bites), 생성자까지는 파악하시는데 어려움이 없을 겁니다.

ChopstickProvider (이 자체도 unbounded_buffer인데) 포인터를 템플릿 인자로 가지는 unbounded_buffer 변수 한쌍이 등장하는데요. (44,45줄) 철학자가 젓가락을 소유하고 있는 상황이 아니고 철학자와는 별개로 젓가락들이 존재하는 상황이기에 필요한 변수들입니다. 이 두 public 변수들을 통해, 나중에 철학자들에게 필요할 때 젓가락을 제공해주는 ChopstickProvider를, 어딘가에서 받을 수 있습니다. 이들을 갖추고 나면 그 후부터 생각하다가 먹다가 할 수 있겠죠.

그 뒤로 run 메소드가 나옵니다. 실제 액터가 구동되면 수행될 함수입니다. 먼저, 전술한 두 변수를 통해 ChopstickProvider가 제공되기를 기다립니다. 이 때 Concurrency::receive 함수를 쓰고 있습니다. (이의 비동기 버전인 Concurrency::try_receive도 있습니다.)

58줄부터는 생각하다 먹기를 반복하는 반복문이 나옵니다. ThinkEat 함수는 89줄 이하에서 확인할 수 있는 것처럼 철학자의 현재 상태를 나타내는 overwrite_buffer 형의 변수 CurrentState를 설정하는 것 이외에는 특별히 하는 일이 없습니다. 그냥 시간을 좀 지체할 뿐입니다.

그리고 이 두 함수 호출 사이에 PickupChopsticksPutDownChopsticks 함수를 써서 실제 가장 중요한 젓가락 한 쌍을 안전하게 획득하고 다시 내려놓는 일을 합니다.


이에 대한 설명은 다음 회를 기대해주세요~ ^^
이 글은 MSDN 글, "Solving The Dining Philosophers Problem With Asynchronous Agents"를 참고하여 작성되었습니다.

오늘은 AAL(Asynchronous Agents Library)의 액터기반프로그래밍을 사용하여, 동기화 개체들로는 해법이 상당히 골치아프기로 유명한 "철학자들의 식사(Dining Philosophers) 문제"를 풀어보겠습니다. 내용이 길어질듯 하여 3회의 연재글로 구성하려 합니다.

먼저 간단히 철학자들의 식사 문제를 소개하면,


간단히 위 그림과 같은 상황입니다. 철학자 다섯명이 식사를 하는데 젓가락(그림에는 포크지만 상관없습니다;)이 보시는바와 같이 역시 다섯개뿐입니다. 그들은 철학자답게 생각하다가 한입 먹다가를 반복합니다. 한입 먹으려면 젓가락 한쌍이 필요해서 옆사람이 사이에 놓인 젓가락을 이미 선점해 먹고 있다면 기다려야 하는 것이죠. 공유 상태를 고려하지 않고 구현하면 데드락 등으로 철학자가 굶는(starvation) 상황이 발생할 수 있습니다. 이 문제는 저명한 컴퓨터과학자 다익스트라가 처음 제시하였습니다. 모니터 등의 동기화 개체를 사용하여 해결하는 방법이 기존에 많이 설명되어 있습니다만... 솔직히 이해하기가 쉽지 않고 구현도 어렵습니다.

이때 AAL이 제공하는 액터모형을 이용하면 그러한 난해함이나 복잡함 없이 이 문제를 해결할 수 있습니다. 액터모형은 독립적으로 동작하며 서로간에는 오로지 메시지만으로 소통하는(즉, 공유 상태를 가지지 않는) 액터들로 시스템을 모델링하는 방법이라 하겠습니다.

본 예제에서는 철학자를 액터(AAL 용어로는 에이전트)로 보고 메시지 전달을 위해 AAL에서 제공하는 몇몇 메시지 블록(message block)들을 사용하여 철학자들의 식사 문제를 해결합니다.

다음과 같은 다섯 클래스들을 작성하게 됩니다.

  • Chopstick 클래스
  • 식탁 위의 젓가락을 실제 소유하며 요청에 따라 철학자에게 제공하는 역할을 하는 ChopstickProvider 
  • 생각하고 먹는 에이전트 역할의 Philosopher 클래스. 이 클래스는 한쌍의 ChopstickProvider와만 소통합니다.
  • 생각하고 먹는 상태를 나타내는 PhilosopherState 열거형
  • 젓가락들과 철학자들이 배치될 Table 클래스

이 과정에서 다음과 같은 AAL의 클래스 및 함수들을 이용합니다.
  • Concurrency::agent - 에이전트 기반 클래스
  • 이하는 메시지 블록에 속하는 여러 타입들
    • Concurrency::unbounded_buffer
    • Concurrency::overwrite_buffer
    • Concurrency::join
    • Concurrency::call
  • 이상의 메시지 블록들에 메시지를 주고 받는데 사용하는 함수들
    • Concurrency::send
    • Concurrency::asend - 위의 비동기 버전으로, 받음 여부를 확인하지 않고 바로 리턴
    • Concurrency::receive

본격적인 구현 과정은 다음 회에 계속됩니다~ ^^