본문 바로가기

개발

rabbitmq tutorial 3 - Publish/Subscribe

Publish/Subscribe

이전에서 work queue 를 만들어 봤다. work queue 뒤에 있는 가정은 각 작업이 정확히 한 명의 작업자에게 전달된다. 이 부분에서 이번에는 완전히 다른 것을 할 것이다. 여러 Consumer들에게 메시지를 전달할 것이다. 이걸 publish/subscrib 패턴이라고 한다. 

패턴을 설명하기 위해 간단한 로깅 시스템을 구축하자. 이 프로그램은 두 개의 프로그램으로 구성된다. 첫 번째 프로그램은 로그 메시지를 내보내고 두 번째 프로그램은 로그 메시지를 수신하고 출력한다.

logging 시스템에서 수신기 프로그램의 모든 실행중인 사본은 메시지를 받을 것이다. 이렇게 하면 하나의 receiver를 실행하고 로그를 디스크로 보낼 수 있다. 동시에 다른 수신기를 실행하고 화면의 로그를 볼 수 있다.

Exchanges

Producer는 메시지를 보내는 사용자 응용 프로그램
Queue는 메시지를 저장하는 버퍼
Consumer는 메시지를 수신하는 사용자 응용 프로그램

RabbitMQ의 메시징 모델에서 핵심 아이디어는 Producer가 어떤 메시지도 queue에 직접 보내지 않는다는 것입니다. 사실, Producer는 메시지가 어떤 queue로 전달될지 전혀 알지 못하는 경우가 많다.


대신에, Producer는 exchange에만 메시지를 보낼 수 있다. exchange는 매우 간단한 것이다. 한쪽은 Producer로부터 메시지를 받고 다른 쪽은 queue에 push를 한다. exchange는 받는 메시지로 무엇을 해야 하는지 정확히 알고 있어야 한다. 특정 대기열에 추가해야 하는지? 많은 대기열에 추가해야 하는지? 아니면 버려질 수도 있다. 이 규칙은 exchange type에 따라 정의된다.




exchange type 4가지 : directtopicheaders and fanout

튜토리에서는 fanout 에 집중 되어 있지만 공부를 하면서 나머지 type에 대해서도 알아 볼 것 이다. 

fanout은 그것을 알고 있는 모든 queue에 broadcast 한다. 

앞 튜토리얼에서는 exchange type을 기술하지 않았는데, type을 명시 하지 않으면 default 나 nameless한 exchange type이 지정된다. 

Temporary queues

이전에  Hello와 task_queue같은 이름을 가진 큐를 사용하고 있었습니다. 대기 행렬의 이름을 붙이는 것은 매우 중요했다. worker 들을 같은 queue로 이끌어야 했다. Producer와 consumer 간에 큐를 공유하려면 큐 이름을 지정하는 것이 중요하다.

그러나 logger의 경우는 아니다. 로그 메시지 중 일부만이 아니라 모든 로그 메시지를 보고 싶다. 또한 예전 메세지가 아닌 현재 메세지에만 관심이 있다. 그것을 해결하기 위해서는 우리는 두 가지가 필요하다.

  1. RabbitMq 에 연결 할때 마다 아무것도 들어가 있지 않은 queue가 필요하다 이러기 위해서는 랜덤이름으로 생성하거나 더 나아가서 랜덤 queue 이름을 선택 할 수 있다.
  2. queue에서 consumer의 연결을 끊으면 대기열이 자동으로 삭제되어야 한다. 

자바 클라이언트에서 queueDeclare()에 파라미터를 넣지 않으면 non-durable, exclusive, autodelete 이러한 큐를 만들게 된다.





Bindings


channel.exchangeDeclare("logs", "fanout");


이미 이걸로 fanout exchange와 queue를 만들었다. 이제 exchange한테 queue에게 메세지를 보내라고 하자. exchange와 queue의 관계를 binding이라고 한다. 

channel.queueBind(queueName, "logs", "");

이제 부터 log exchange는 queue에 메세지를 추가 할 것이다.



Putting it all together

로그 메시지를 내보내는 producer 프로그램은 이전 튜토리얼과 크게 다르지 않다. 가장 중요한 변화는 우리가 이제 nameless한 것이 아니라 우리의 log exchange에 메시지를 publish 할 수 있다. Sending 시 routingKey를 제공해야 하지만 fanout exchange에서는 값이 무시된다. EmitLog.java 프로그램의 코드는 다음과 같다.



Connection을 설정한 후에 exchange를 선언했다. 이 단계는 존재하지 않는 exchange에 publish 하는것을 금지하기 때문에 필요하다.

아직 exchange에 묶여 있지 않으면 메시지가 손실될 것이다. 하지만 괜찮다; 만약 소비자가 아직 듣지 않고 있다면 우리는 그 메시지를 안전하게 버릴 수 있다.



아래 화면은 publish/subscribe 를 했을때, subscirbe하고 있는 모든 부분에 메세지가 날아간걸 확인 할수 있다. 



Receive 받는 부분에 queue 이름을 출력 할 수 있게 찍어 놓았는데, 큐 이름을 RabbitMq mangerment 채널 페이지에서 확인 할 수 있다.