본문 바로가기

개발

spring cloud stream rabbitmq 처리량을 늘리는 방법

Spring Cloud 란 마이크로 서비스를 구축하기 위한 프레임워크들

들어가기 전에 개념 정리 부터 해야할거 같아서 간단하게 찾아봤다. 

  • 마이크로서비스를 구축하기 위해 필요한 라이브러리의 집합 (프레임워크).
  • Spring Cloud는 spring-cloud-xxxxx 프로젝트들의 umbrella 프로젝트이다. 

Spring Cloud Stream

  • 외부 시스템에 연결할 수 있는 애플리케이션을 신속하게 구축할 수 있는 경량의 마이크로 서비스 프레임 워크.
  • 메시지 기반 마이크로 서비스를 구현하기 위한 프레임 워크

→ Spring Boot를 기반으로 DevOps 친화적인 마이크로 서비스 애플리케이션을 만들고
Spring Integration은 메시지 브로커와의 연결을 제공합니다.

 Spring Cloud Stream은 메시지 브로커의 독창적인 구성을 제공하여 여러 미들웨어 공급 업체에 pub/sub,
소비자 그룹 및 파티션 개념을 도입합니다.

스프링 클라우드 스트림을 통해 개발자는 다음을 수행할 수 있습니다.

  • 데이터 중심 애플리케이션을 개별적으로 구축, 테스트, 반복 및 구축합니다.
  • 메시징을 통한 구성을 포함하여 최신 마이크로 서비스 아키텍처 패턴을 적용합니다.
  • 애플리케이션 책임을 이벤트 중심적 사고와 분리합니다. 이벤트는 다운스트림 소비자 애플리케이션이 어디에서 발생했는지 또는 생산자의 신원을 알지 못한 채 반응할 수 있는 시간에 발생한 무언가를 나타낼 수 있다.
  • 비즈니스 로직을 메시지 브로커(예: RabbitMQ, Apache Kafka, Amazon Kinesis)로 포팅합니다.
  • Project Reactor의 Flux 및 Kafka Streams API를 사용하여 채널 기반 및 비채널 기반 애플리케이션 바인딩 시나리오 간에 상호 운용하여 상태 비저장 및 상태 저장 계산을 지원합니다.
  • 일반적인 사용 사례에 대한 프레임워크의 자동 콘텐츠 유형 지원을 사용합니다. 다른 데이터 변환 유형으로 확장할 수 있습니다

스트림 프로세싱은 일괄처리와는 다르게 다음 일괄처리 시간까지 기다리지 않고 데이터가 도착할 때마다 지속적으로 처리되는 것

아무런 설정없이 돌리면 메인스레드와 stream 을 받는 스레드 2개로 돌아간다

 

1. What is the RabbitMQ prefetch count?

RabbitMQ 에서는 prefetch count 라는 옵션 값이 있다.

The short answer: The RabbitMQ prefetch value 얼마나 많은 메세지를 한번에 보내는지에 사용된다.

The best practice is to set a consumer prefetch by setting a limit on the number of unacked messages at the client.

prefetch count 에 따라서 메모리로 갯수만큼 가져오게 된다.

prefetchCount 옵션은 acknowledgeMode 가 NONE 이면 무시된다.

RabbitMQ acknowledge mode auto 설정은 메시지에 대한 클라이언트 ack 없이 전송 후에 바로 큐에서 삭제한다

prefetchCount 값을 증가시킬려면  batchSize  와 messagePerAck 를 맞춰야 한다.

prefetch count는 spring cloud stream 2.0부터 기본 설정이 250개로 변경되었고 1로 지정하면 확실한 순서보장을 할 수 있다.

AbstractMessageListenerContainer 에서는

public static final int *DEFAULT_PREFETCH_COUNT* = 250; 이라고 되있는데

실제로 동작 시켜보면 prefetch 가 1로 동작한다.

 

prefetch count를 10으로 하고 서버 10대에 consumer를 4개로 지정하면

RabbitMQ Management에 보이는 Unacked 개수는 최대 (10*10*4) => 400까지가 나올수 있다.

Ack required가 false여서 O 표시로 되어 있고 autoAck인 경우 prefetch count 설정이 무시된다.

하지만 prefetch count 설정을 한다고 해서 처리량이 늘어나지는 않는다!!!!!

왜냐 메모리에서 가져온다고 빨리 처리되는건 아니니깐 처리량은 늘어나지 않는다. 

2. @Async 어노테이션 사용

@Async 는 별도의 쓰레드를 사용해서 병렬처리를 한다.

@Async는 SimpleAsyncTaskExecutor를 사용하도록 되어 있는데,
이것은 스레드 풀이 아니고 단순히 스레드를 만들어내는 역할이라서 스레드 풀을 만들어서 사용하는것이 바람직하다.

threadPoolTaskExecutor 를 빈으로 설정 해서 맞는 설정을 해야 한다.

@Bean("simpleTaskExecutor")
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);	// 기본 스레드 수
    taskExecutor.setMaxPoolSize(10);	// 최대 스레드 수
    taskExecutor.setQueueCapacity(100);	// Queue 사이즈
    return taskExecutor;
}

Ack 전달을 각각의 처리가 끝난 후에 발송한다.
@Async 메서드를 사용해서 처리했던 위 예시는 메시지를 처리하기 전에 미리 ACK 를 보냈기 때문에 의도한대로 동작하지 않는다.

왜냐하면 rabbitmq 에서 관리 하는게 아니니 추천되지 않는 방법이다. 

rabbitmq 를 쓰면 그 플랫폼에 맞게 써야하는게 맞다! 

 

3. spring cloud stream maxConcurrency 사용

하지만 spring cloud stream 에서는 maxConcurrency 로 설정 할 수 있다.

해당 옵션을 사용하면 Consumer 쓰레드를 모든 처리가 완료된 이후에 ACK 를 보낸다.

maxConcurrency 를 10으로 두고 동작을 시켜 본결과 위에 언급된것 처럼

첫번째 스레드로 돌다가 10초가 경과된 뒤에 두번째 스레드가 돌면서 점점 늘어나게 된다.

다른 소스 예시를 보다가 prefetch count 를 셋팅할때 RabbitListenerConfig 를 bean 으로 선언하고 Connection Factory 에 prefetch count 를 설정하는 부분이 있다.

There are three connection factories to chose from

  • PooledChannelConnectionFactory
  • ThreadChannelConnectionFactory
  • CachingConnectionFactory
  • PooledChannelConnectionFactory

이 공장에서는 Apache Pool2를 기반으로 단일 연결과 두 개의 채널 풀을 관리한다. 이 팩토리를 사용하려면 Apache commons-pool2 jar가 클래스 경로에 있어야 한다.

  • ThreadChannelConnectionFactory

이 팩토리에서는 단일 연결과 두 개의 스레드 로컬을 관리한다.
하나는 트랜잭션 채널용이고 다른 하나는 비 트랜잭션 채널용이다.

  • CachingConnectionFactory

제공되는 세 번째 구현은 캐싱 연결 팩토리이며, 기본적으로 응용프로그램이 공유할 수 있는 단일 연결 프록시를 설정합니다. CachingConnectionFactory 구현은 이러한 채널의 캐싱을 지원하며, 채널이 트랜잭션인지 여부에 따라 채널에 대한 별도의 캐시를 유지합니다.

나중에 해당 설정이 없어도 된다는 것을 알고 기본 설정에서는 어떤 팩토리로 동작할까 확인 해보니 기본으로 CachingConnectionFactory 로 동작하는것을 알 수 있다.

scale up

서버의 스팩을 올릴때 고려 해야 하는 상황은 prefetch count 와 maxConcurrency 의 수

메모리를 올리면 prefetch count의 값 증가가 필요.

maxConcurrency 수는 스레드기 떄문에 scale up 할때 영향이 있다.

scale out

같은 스팩으로 서버를 여러대를 늘릴때 컨슈머가 2대 일때라고 가정 해본다.

큐에서 데이터가 쌓여있을때 prefetch 기준으로 하나씩 가져오기 때문에 겹치지 않는다(테스트 필요)

설정해야할것은 새로 추가되는 서버의 포트

결론

spring cloud stream maxConcurrency 사용한다

concurrency 는 consumer가 계속 동작되면 concurrency는 점점 더 올라가고

consumer 가 (batchSize * receiveTimeout) milliseconds 만큼 메세지를 받지 않으면

concurrency 가 하나씩 줄어든다~~~~~~