진행되는 프로젝트는 boot 1.5.X
org.springframework.cloud:spring-cloud-starter-stream-rabbit 최신버전으로 하니 에러 발생 했다.
ConfigurableCompositeMessageConverter 가 없어서 발생했는데
https://stackoverflow.com/questions/46473045/spring-boot-spring-cloud-stream-kafka-implementation
The org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter is from Spring Integration 5.0:
보면 ConfigurableCompositeMessageConverter는 spring 5.0에서 지원 하는데
사용 하고 있는 boot 버전 1.5.X은 <spring.version>4.3.12.RELEASE</spring.version> 라
1.2.X 로 써야했다....
[기본 샘플 모듈]
message publish - exchange - queue - message consume
amqp의 장점은 자유롭게 모듈을 붙일수 있다.
[원하는 모듈]
message publish - (pub)exchange - (sub)exchange - (topic)queue - message consume
- (topic)queue - message consume
메시지를 발행해서 pub 익스체인지로 던짐.
pub 익스체인지는 받는 sub 익스체인지로 메시지 전달.
sub 익스체인지에서 topic 방식으로 큐에게 메시지를 발행
발생한 이슈 몇가지...
1. 익스체인지와 익스체인지 바인딩이 yml로 안됨.
익스체인지와 큐 바인딩은 yml 설정으로 되는데 익스체인지간 바인딩은 yml에서
안되는것 같았다. 찾아 보니 나랑 똑같은 질문을 한 사람이 stackoverflow에 있었다.
답변으로 @Bean 어노테이션으로 할 수 있다고 해놨는데, 어노테이션으로 연결하는건 해봤고 알고 있었다.
spring cloud binding을 쓰면서 yml에 모든 설정을 두고 싶어서 찾아봤는데 못찾은건지 없는건지 모르겟다.
@Bean으로 설정하기 싫은 이유는 yml에 이미 익스 체인지와 큐 바인딩을 다 지원해주고 굳이 익스체인지간 바인딩을 위해
또 새로운 클래스를 만들고 이미 설정이 다 되있는것을 다시 설정하는게 싫었다.
해결방법은 3가지로 나뉠거 같다.
-
스크립트를 짜서 자동으로 바인딩이 될 수 있게 하거나
-
@Bean 어노테이션으로 설정을 분리하거나
-
admin 툴에서 수동으로 바인딩 시키거나
2. 라우팅 키 설정
topic exchange는 topic에 따라 메시지를 전달하는 익스체인지이고 라우팅 키를 지정 해줘야 한다.
또한 발행시에도 라우팅 키를 지정해서 발행해야 한다.
[TODO]
1. producer 에서 라우팅 키 지정해서 메시지 publish
2. consumer 에서 지정된 라우팅 키로 메시지 consume
stackoverflow에 Howto set routing key for producer라고 질문한 사람이 있다.
gary russell 은 spring amqp 프로젝트의 lead 개발자이다.
위에 예시를 보면 routingKeyExpression을 써서 producer에 routingkey를 set 하면 된다.
이런식으로 yml에 사용하면 되고, 소스에서 메시지를 전송할때는
MessageBuilder를 써서 payload에 메시지를 담고, header를 set 할때 key에 routingKeyExpression 에 들어간
값을 넣어주면 된다.
MessageBuilder는 org.springframework.messaging 에 있는 클래스인데 이거 때문에 약간 헷갈렸다.
amqp 스펙에서는 메시지를 보낼때 [basic.publish 메소드 프레인 + 헤더프레임 + 바디 프레임] 으로 보내는데
헤더 프레임에 headers를 사용해서 보내는줄 알았다. topic exchange는 라우팅 키로 구분하는데
왜 header exchange 와 같은 방식인 header에 담아서 보내는거지? 라고 생각했는데 다른거였다............
[참고]
스프링 레퍼런스에 15.3.2. RabbitMQ Consumer Properties 설정이 있다.
routingKeyExpression
A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey' in a properties file or routingKeyExpression: '''my.routingKey''' in a YAML file.
consumer 에서 받을때는 @StreamListner에서 라우팅 키를 지정하고 받으면 된다.
추가로 메시지를 발행하고 나서 전송되기 직전에 메시지가 어떤 라우팅 키와 메시지로 세팅되나 궁금했는데
MessageChannel.send - AbstractMessageChannel.send 에서 debug를 태우면 된다.
그리고 아래 그림과 같이 spring cloud stream에서 알아서 rabbitmq와 연결해줘서 편리해진거 같기도 한데
amqp 메시지로 어떻게 전송되는지는 좀 알기 힘들어 진거 같기도 하다.
'개발' 카테고리의 다른 글
Failed to validate connection com.mysql.cj.jdbc.ConnectionImpl@3900e5a6 (No operations allowed after connection closed.). Possibly consider using a shorter maxLifetime value. (0) | 2021.06.15 |
---|---|
layerd jar가 필요한 이유와 적용방법 - docker build 속도개선 (0) | 2020.08.22 |
HTTP 완벽 가이드 - http 메시지 (0) | 2019.09.09 |
http 완벽 가이드 - URL 과 리소스 (0) | 2019.09.08 |
AMQP 프로토콜의 메시지 소비 과정 (0) | 2019.04.24 |