본문 바로가기

개발

spring cloud stream 적용기

진행되는 프로젝트는 boot 1.5.X 

 

org.springframework.cloud:spring-cloud-starter-stream-rabbit 최신버전으로 하니 에러 발생 했다.

 

ConfigurableCompositeMessageConverter 가 없어서 발생했는데

 

https://stackoverflow.com/questions/46473045/spring-boot-spring-cloud-stream-kafka-implementation

 

The org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter is from Spring Integration 5.0:

 

보면 ConfigurableCompositeMessageConverter는 spring 5.0에서 지원 하는데 

 

 

사용 하고 있는 boot 버전 1.5.X은 <spring.version>4.3.12.RELEASE</spring.version> 라 

 

https://repo1.maven.org/maven2/org/springframework/boot/spring-boot-dependencies/1.5.8.RELEASE/spring-boot-dependencies-1.5.8.RELEASE.pom

 

1.2.X 로 써야했다....

 

[기본 샘플 모듈]

message publish - exchange - queue - message consume 

 

amqp의 장점은 자유롭게 모듈을 붙일수 있다.

 

 

[원하는 모듈]

message publish - (pub)exchange - (sub)exchange   - (topic)queue - message consume 

                                                 - (topic)queue - message consume 

 

메시지를 발행해서 pub 익스체인지로 던짐.

pub 익스체인지는 받는 sub 익스체인지로 메시지 전달.

sub 익스체인지에서 topic 방식으로 큐에게 메시지를 발행 

 

발생한 이슈 몇가지... 

 

1. 익스체인지와 익스체인지 바인딩이 yml로 안됨. 

익스체인지와 큐 바인딩은 yml 설정으로 되는데 익스체인지간 바인딩은 yml에서 

안되는것 같았다. 찾아 보니 나랑 똑같은 질문을 한 사람이 stackoverflow에 있었다.

https://stackoverflow.com/questions/55616937/bind-exchange-to-exchange-through-spring-cloud-stream-with-rabbitmq-binder

답변으로 @Bean 어노테이션으로 할 수 있다고 해놨는데, 어노테이션으로 연결하는건 해봤고 알고 있었다.

spring cloud binding을 쓰면서 yml에 모든 설정을 두고 싶어서 찾아봤는데 못찾은건지 없는건지 모르겟다.

@Bean으로 설정하기 싫은 이유는 yml에 이미 익스 체인지와 큐 바인딩을 다 지원해주고 굳이 익스체인지간 바인딩을 위해

또 새로운 클래스를 만들고 이미 설정이 다 되있는것을 다시 설정하는게 싫었다.

 

해결방법은 3가지로 나뉠거 같다.

  • 스크립트를 짜서 자동으로 바인딩이 될 수 있게 하거나 

  • @Bean 어노테이션으로 설정을 분리하거나

  • admin 툴에서 수동으로 바인딩 시키거나

 

2. 라우팅 키 설정

 

topic exchange는 topic에 따라 메시지를 전달하는 익스체인지이고 라우팅 키를 지정 해줘야 한다. 

 

또한 발행시에도 라우팅 키를 지정해서 발행해야 한다. 

 

[TODO]

1. producer 에서 라우팅 키 지정해서 메시지 publish

2. consumer 에서 지정된 라우팅 키로 메시지 consume

 

stackoverflow에 Howto set routing key for producer라고 질문한 사람이 있다. 

 

gary russell 은 spring amqp 프로젝트의 lead 개발자이다. 

 

 

위에 예시를 보면 routingKeyExpression을 써서 producer에 routingkey를 set 하면 된다. 

 

이런식으로 yml에 사용하면 되고, 소스에서 메시지를 전송할때는

 

 

MessageBuilder를 써서 payload에 메시지를 담고, header를 set 할때 key에 routingKeyExpression 에 들어간 

 

값을 넣어주면 된다.

 

MessageBuilder는 org.springframework.messaging 에 있는 클래스인데 이거 때문에 약간 헷갈렸다. 

 

amqp 스펙에서는 메시지를 보낼때 [basic.publish 메소드 프레인 + 헤더프레임 + 바디 프레임] 으로 보내는데 

 

헤더 프레임에 headers를 사용해서 보내는줄 알았다. topic exchange는 라우팅 키로 구분하는데 

 

왜 header exchange 와 같은 방식인 header에 담아서 보내는거지? 라고 생각했는데 다른거였다............

 

 

[참고]

 

https://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_spring_cloud_stream_core

 

스프링 레퍼런스에 15.3.2. RabbitMQ Consumer Properties 설정이 있다.

 

routingKeyExpression

A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey' in a properties file or routingKeyExpression: '''my.routingKey''' in a YAML file.

 

consumer 에서 받을때는 @StreamListner에서 라우팅 키를 지정하고 받으면 된다. 

 

추가로 메시지를 발행하고 나서 전송되기 직전에 메시지가 어떤 라우팅 키와 메시지로 세팅되나 궁금했는데 

 

MessageChannel.send - AbstractMessageChannel.send 에서 debug를 태우면 된다. 

 

 

 

그리고 아래 그림과 같이 spring cloud stream에서 알아서 rabbitmq와 연결해줘서 편리해진거 같기도 한데 

 

amqp 메시지로 어떻게 전송되는지는 좀 알기 힘들어 진거 같기도 하다.