티스토리 뷰
최근에 운영중인 서비스에 RabbitMQ 를 적용하여 메시징큐 시스템을 도입하여 서버의 장애 시 전파를 방지하기 위한 방법으로 사용하였습니다.
왜 메시지 큐를 사용할까요?
Request Data 를 핸들링 하기 위해서 RESTful의 데이터를 수용하는 API 서버보다 더 상위에 위치하여 클라이언트의 요청을 저장하는 별도의 메시징 큐 시스템을 도입하여 사용자로부터 요청을 저장하고 사용하는 방법이 있습니다.
Request 가 폭주하여 큐 시스템이 다운되어도 API 시스템이 다운되는 것을 방지할 수 있기 때문입니다.
메시지 큐는 아래와 같은 장점이 있습니다.
- 비동기: Queue에 저장되기 때문에 차후 처리가 용이합니다. 다중 애플리케이션과 비동기 통신이 가능합니다.
- 비동조: 운영하고 있는 애플리케이션과 별도로 관리가 가능합니다.
- 탄력성: 일부가 실패하면 전부 롤백이 되는 트랜잭션의 개념이 아닙니다.
- 과잉: 실패하여도 재시도가 가능합니다.
- 보증: 작업이 처리된 결과를 확인하기 용이합니다.
- 확장성: 다수의 프로세스들이 큐에 메시지를 전송할 수 있습니다.
- 다른 곳의 API로부터 데이터 송수신이 가능합니다.
- 이메일 발송이나 파일 업로드도 가능합니다.
- 다량의 프로세스를 처리할 수 있습니다.
이전에 사용해본 경험이 있는 RabbitMQ 시스템은 Ubuntu OS 에서 적용해본 사항입니다. RabbitMQ 는 Java 언어와의 호환성도 우수하여 사용하는 방법도 크게 어렵지 않았던 것으로 기억합니다.
테스트 실행 환경
Ubuntu OS 20.04
Architecture: ARM
vCPUs : 2
CPU : 2.5GHz
RAM : 2 GiB
가장 먼저 RabbitMQ 를 설치합니다.
$ apt-get install rabbitmq-server
다음으로 모니터링을 편하게 하기 위해 GUI 모드를 활성화시킵니다. 단, 이 부분은 아래와 같이 서버의 특성에 따라 권한적인 부분이 필요하므로 sudo 명령어를 사용하여 실행할 수도 있습니다.
[error] Error when reading ./.erlang.cookie: eacces
$ rabbitmq-plugins enable rabbitmq_management
활성화된 플러그인이 제대로 켜져있는지 확인하는 명령어입니다.
$ rabbitmq-plugins list
그리고 옵션에 따라 RabbitMQ 를 실행하는 방법이 다른데, 현재 SSH 연결되어 있는 세션에 종속되어 실행시키기 위해서는 아래와 같이 재시작시킵니다.
$ rabbitmq-server restart
백그라운드에서 데몬으로 실행시키기 위해서는 아래와 같이 실행합니다.
$ service rabbitmq-server restart
실행 옵션의 경우 가장 마지막에 붙이게 되며 다음과 같이 동작합니다.
start ## 서버 실행
restart ## 서버 재실행
stop ## 서버 정지
서비스가 정상적으로 실행되었는지 체크하기 위해서는 아래의 명령어를 입력하여 + 표시가 되어 있는 서비스를 확인하면 됩니다.
$ service --status-all
RabbitMQ의 GUI 모드는 아이디/패스워드가 기본적으로 guest 와 같이 생성되지만 외부에서 접근할 수 없으므로 새롭게 아이디를 생성해주고 권한을 적용해주는 옵션을 추가합니다.
$ rabbitmqctl add_user {id} {password}
$ rabbitmqctl set_user_tags {id} administrator
RabbitMQ 의 동작방식
RabbitMQ는 클라이언트로부터의 요청을 메시지의 양식으로 Queue에 담아두게 됩니다.
저장된 메시지들을 정렬하여 적절한 큐 또는 다른 곳으로 이동시키는 작업을 Exchange 라고 하며, 이렇게 되면 Binding 과 메시지를 매칭시키기 위한 라우팅 알고리즘을 Exchange Type 이라고 합니다.
그리고 이는 메시지를 생산하는 Publisher 의 개념이 되게 되는데, 해당 Queue 에 담긴 메시지를 구독(Subscribe)하는 방법으로 메시지를 소비(Consume) 하게 되는 방법으로 작동합니다.
구독자(Subscriber)는 Publisher가 생산한 메시지에 접근하는데 필요한 키를 Routing Key라고 하며, 송신한 메시지 헤더에 포함됩니다.
RabbitMQ 의 Exchange Type
RabbitMQ 는 AMQP(Advanced Message Queuing Protocol)을 지원하기 때문에 메시징 큐의 오픈소스 기반 표준 프로토콜을 준수합니다.
위에서 언급하였지만 Exchange Type 은 Routing Key 에 기반한 3개의 라우팅 알고리즘과 key - value 헤더에 근거한 1개 유형의 Type 이 있습니다.
Direct Exchange
메시지의 라우팅 키를 큐에 1:1 로 매칭시키는 방법입니다. 간단하게 사용하기 용이합니다.
Fanout Exchange
메시지의 라우팅 키를 무시하고 Exchange 에 바인딩 된 모든 큐에 메시지를 전송합니다. 1:N 의 연결 구조를 가집니다.
Topic exchange
Exchange 에 바인딩 된 큐 중 라우팅 키가 패턴에 맞는 큐에게 모두 메시지를 전송합니다. 멀티캐스트 방식이라고도 합니다.
Headers exchange
라우팅 키 대신 메시지 헤더에 담겨있는 속성(key - value)들에 근거하여 큐로 메시지를 전송합니다.
Spring Framework 에서 RabbitMQ 적용하기
- 의존성을 추가해줍니다.
compile 'org.springframework.amqp:spring-rabbit:2.4.3'
아래와 같이 Configuration 파일을 생성하고 서비스에 적용합니다.
RabbitConfiguration
import kr.co.energyx.xquare.core.message.RabbitMessageListener;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 모든 메소드는 @Bean 적용이 되어 Spring 에 자동으로 Bean 주입이 됩니다.
*/
@Configuration
@EnableRabbit
public class RabbitConfiguration {
public static final String serviceName = "service";
public static final String topicExchangeName = "testExchange";
public static final String queueName = "testQueue";
/**
* 큐를 빈에 주입합니다.
* @return
*/
@Bean
Queue queue() {
return new Queue(queueName, false);
}
/**
* 토픽 Exchange 를 지정합니다.
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
/**
* 큐와 Exchange Type 을 바인딩 해줍니다.
* 여기서 with 에 들어가는 키는 라우팅 키 값으로 Ant Matcher 와 같게 적용됩니다.
* @param queue
* @param exchange
* @return
*/
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(
new StringBuilder().append(serviceName).append(".#").toString());
}
/**
* 메시지 컨테이너를 등록합니다.
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
/**
* 메시지를 받을 수 있는 리스너를 등록합니다.
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RabbitReceiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
다음으로 작성할 내용은 메세지를 수신하는 클래스입니다.
RabbitReceiver.java
import java.text.MessageFormat;
import java.util.concurrent.CountDownLatch;
@Component
@Getter
public class RabbitReceiver {
// CountDownLatch 는 메시지 수신 여부를 알려줍니다.
private CountDownLatch latch = new CountDownLatch();
public void receiveMessage(String message) {
System.out.println(MessageFormat.format("Received Message : {0}", message));
latch.countDown();
}
}
메시지를 발송하는 테스트 클래스입니다.
import java.util.concurrent.TimeUnit;
@SpringBootTest
@ActiveProfiles("test")
public class RabbitMessageSendTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitReceiver receiver;
@Autowired
private RabbitConfiguration rabbitConfiguration;
private static final String ROUTER_KEY = "service";
@Test
public void sendToRabbitMessage() {
rabbitTemplate.convertAndSend(rabbitConfiguration.topicExchangeName,
ROUTER_KEY, "Send Message Test!");
receiver.getLatch().await(1, TimeUnit.SECONDS);
}
}
테스트 발송 후 RabbitMQ GUI 페이지에서 확인할 수 있습니다.
'Server' 카테고리의 다른 글
Docker의 기본적인 개념 (0) | 2022.04.16 |
---|---|
[Java] JUnit 테스트에서 LazyLoading 일때 발생하는 에러 해결 (1) | 2022.04.15 |
[Java] 컴파일 후 실행하면 나는 "jps.track.ap.dependencies" 에러 해결 (2) | 2022.04.12 |
Jenkins 글로벌 변수 설정하기 (1) | 2022.03.14 |
Spring Batch 실행 명령어 (0) | 2022.03.13 |
Ktor 프레임워크 공부하기 (1) | 2022.03.12 |