티스토리 뷰

반응형

최근에 운영중인 서비스에 RabbitMQ 를 적용하여 메시징큐 시스템을 도입하여 서버의 장애 시 전파를 방지하기 위한 방법으로 사용하였습니다. 

 

왜 메시지 큐를 사용할까요?

Request Data 를 핸들링 하기 위해서 RESTful의 데이터를 수용하는 API 서버보다 더 상위에 위치하여 클라이언트의 요청을 저장하는 별도의 메시징 큐 시스템을 도입하여 사용자로부터 요청을 저장하고 사용하는 방법이 있습니다.

Request 가 폭주하여 큐 시스템이 다운되어도 API 시스템이 다운되는 것을 방지할 수 있기 때문입니다.

메시지 큐는 아래와 같은 장점이 있습니다.

  • 비동기: Queue에 저장되기 때문에 차후 처리가 용이합니다. 다중 애플리케이션과 비동기 통신이 가능합니다.
  • 비동조: 운영하고 있는 애플리케이션과 별도로 관리가 가능합니다.
  • 탄력성: 일부가 실패하면 전부 롤백이 되는 트랜잭션의 개념이 아닙니다.
  • 과잉: 실패하여도 재시도가 가능합니다.
  • 보증: 작업이 처리된 결과를 확인하기 용이합니다.
  • 확장성: 다수의 프로세스들이 큐에 메시지를 전송할 수 있습니다.
  • 다른 곳의 API로부터 데이터 송수신이 가능합니다.
  • 이메일 발송이나 파일 업로드도 가능합니다.
  • 다량의 프로세스를 처리할 수 있습니다.

 


 

이전에 사용해본 경험이 있는 RabbitMQ 시스템은 Ubuntu OS 에서 적용해본 사항입니다. RabbitMQ 는 Java 언어와의 호환성도 우수하여 사용하는 방법도 크게 어렵지 않았던 것으로 기억합니다.

테스트 실행 환경

Ubuntu OS 20.04
Architecture: ARM
vCPUs : 2 
CPU : 2.5GHz
RAM : 2 GiB

가장 먼저 RabbitMQ 를 설치합니다.

$ apt-get install rabbitmq-server

다음으로 모니터링을 편하게 하기 위해 GUI 모드를 활성화시킵니다. 단, 이 부분은 아래와 같이 서버의 특성에 따라 권한적인 부분이 필요하므로 sudo 명령어를 사용하여 실행할 수도 있습니다.

 [error] Error when reading ./.erlang.cookie: eacces
$ rabbitmq-plugins enable rabbitmq_management

활성화된 플러그인이 제대로 켜져있는지 확인하는 명령어입니다.

$ rabbitmq-plugins list

그리고 옵션에 따라 RabbitMQ 를 실행하는 방법이 다른데, 현재 SSH 연결되어 있는 세션에 종속되어 실행시키기 위해서는 아래와 같이 재시작시킵니다.

$ rabbitmq-server restart

백그라운드에서 데몬으로 실행시키기 위해서는 아래와 같이 실행합니다.

$ service rabbitmq-server restart

실행 옵션의 경우 가장 마지막에 붙이게 되며 다음과 같이 동작합니다.

start ## 서버 실행
restart ## 서버 재실행
stop ## 서버 정지

서비스가 정상적으로 실행되었는지 체크하기 위해서는 아래의 명령어를 입력하여 + 표시가 되어 있는 서비스를 확인하면 됩니다.

$ service --status-all

RabbitMQ의 GUI 모드는 아이디/패스워드가 기본적으로 guest 와 같이 생성되지만 외부에서 접근할 수 없으므로 새롭게 아이디를 생성해주고 권한을 적용해주는 옵션을 추가합니다.

$ rabbitmqctl add_user {id} {password}
$ rabbitmqctl set_user_tags {id} administrator

 

RabbitMQ 의 동작방식

RabbitMQ는 클라이언트로부터의 요청을 메시지의 양식으로 Queue에 담아두게 됩니다.

저장된 메시지들을 정렬하여 적절한 큐 또는 다른 곳으로 이동시키는 작업을 Exchange 라고 하며, 이렇게 되면 Binding 과 메시지를 매칭시키기 위한 라우팅 알고리즘을 Exchange Type 이라고 합니다.

그리고 이는 메시지를 생산하는 Publisher 의 개념이 되게 되는데, 해당 Queue 에 담긴 메시지를 구독(Subscribe)하는 방법으로 메시지를 소비(Consume) 하게 되는 방법으로 작동합니다.

구독자(Subscriber)는 Publisher가 생산한 메시지에 접근하는데 필요한 키를 Routing Key라고 하며, 송신한 메시지 헤더에 포함됩니다.

 

RabbitMQ 의 Exchange Type

RabbitMQ 는 AMQP(Advanced Message Queuing Protocol)을 지원하기 때문에 메시징 큐의 오픈소스 기반 표준 프로토콜을 준수합니다.

위에서 언급하였지만 Exchange Type 은 Routing Key 에 기반한 3개의 라우팅 알고리즘과 key - value 헤더에 근거한 1개 유형의 Type 이 있습니다.

Direct Exchange

메시지의 라우팅 키를 큐에 1:1 로 매칭시키는 방법입니다. 간단하게 사용하기 용이합니다.

Fanout Exchange

메시지의 라우팅 키를 무시하고 Exchange 에 바인딩 된 모든 큐에 메시지를 전송합니다. 1:N 의 연결 구조를 가집니다.

Topic exchange

Exchange 에 바인딩 된 큐 중 라우팅 키가 패턴에 맞는 큐에게 모두 메시지를 전송합니다. 멀티캐스트 방식이라고도 합니다.

Headers exchange

라우팅 키 대신 메시지 헤더에 담겨있는 속성(key - value)들에 근거하여 큐로 메시지를 전송합니다.

 


Spring Framework 에서 RabbitMQ 적용하기

  1. 의존성을 추가해줍니다.
compile 'org.springframework.amqp:spring-rabbit:2.4.3'

아래와 같이 Configuration 파일을 생성하고 서비스에 적용합니다.

RabbitConfiguration

import kr.co.energyx.xquare.core.message.RabbitMessageListener;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 모든 메소드는 @Bean 적용이 되어 Spring 에 자동으로 Bean 주입이 됩니다.
 */
@Configuration
@EnableRabbit
public class RabbitConfiguration {

   public static final String serviceName = "service";
   public static final String topicExchangeName = "testExchange";
   public static final String queueName = "testQueue";

   /**
    * 큐를 빈에 주입합니다.
    * @return
    */
   @Bean
   Queue queue() {
      return new Queue(queueName, false);
   }

   /**
    * 토픽 Exchange 를 지정합니다.
    * @return
    */
   @Bean
   TopicExchange exchange() {
      return new TopicExchange(topicExchangeName);
   }

   /**
    * 큐와 Exchange Type 을 바인딩 해줍니다. 
    * 여기서 with 에 들어가는 키는 라우팅 키 값으로 Ant Matcher 와 같게 적용됩니다.
    * @param queue
    * @param exchange
    * @return
    */
   @Bean
   Binding binding(Queue queue, TopicExchange exchange) {
      return BindingBuilder.bind(queue).to(exchange).with(
              new StringBuilder().append(serviceName).append(".#").toString());
   }

   /**
    * 메시지 컨테이너를 등록합니다.
    * @param connectionFactory
    * @param listenerAdapter
    * @return
    */
   @Bean
   SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
           MessageListenerAdapter listenerAdapter) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      container.setQueueNames(queueName);
      container.setMessageListener(listenerAdapter);
      return container;
   }

   /**
    * 메시지를 받을 수 있는 리스너를 등록합니다.
    * @param receiver
    * @return
    */
   @Bean
   MessageListenerAdapter listenerAdapter(RabbitReceiver receiver) {
      return new MessageListenerAdapter(receiver, "receiveMessage");
   }
}

다음으로 작성할 내용은 메세지를 수신하는 클래스입니다.

RabbitReceiver.java

import java.text.MessageFormat;
import java.util.concurrent.CountDownLatch;

@Component
@Getter
public class RabbitReceiver {

   // CountDownLatch 는 메시지 수신 여부를 알려줍니다.
   private CountDownLatch latch = new CountDownLatch();

   public void receiveMessage(String message) {
      System.out.println(MessageFormat.format("Received Message : {0}", message));
      latch.countDown();
   }
}

메시지를 발송하는 테스트 클래스입니다.

import java.util.concurrent.TimeUnit;

@SpringBootTest
@ActiveProfiles("test")
public class RabbitMessageSendTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitReceiver receiver;
    @Autowired
    private RabbitConfiguration rabbitConfiguration;
    
    private static final String ROUTER_KEY = "service";

    @Test
    public void sendToRabbitMessage() {
       rabbitTemplate.convertAndSend(rabbitConfiguration.topicExchangeName,
              ROUTER_KEY, "Send Message Test!");
       receiver.getLatch().await(1, TimeUnit.SECONDS);
    }

}

테스트 발송 후 RabbitMQ GUI 페이지에서 확인할 수 있습니다.

반응형
댓글
공지사항