본문 바로가기
기타/RabbitMQ

Java로 RabbitMQ 시작하기

by oneny 2024. 5. 23.

RabbitMQ

RabbitMQ는 메시지 브로커(Message Broker)로서 매우 단순한 애플리케이션부터 대규모 시스템까지 다양한 분산 소프트웨어 아키텍처를 만들기 위한 매우 강력하고 가벼운 도구이다.

 RabbitMQ는 관리자 UI 플러그인과 함꼐 코어 어플리케이션을 구동하는데 40MB 미만의 메모리만 사용하여 가볍고, 이후 큐(Queue)에 전송되는 메시지양이 증가함에 따라 메모리 사용량이 점차 증가한다. 또한, 메시지를 배달하기 전에 디스크 또는 메모리에 저장하거나 클러스터를 설정할 때 큐를 HA(Highly Available)로 설정해서 여러 노드에 걸쳐 저장하도록 옵션을 설정하여 메시지 처리량 혹은 성능을 유연하게 제어하고 안정적으로 메시지를 전달할 수 있다.

 

RabbitMQ 등장 배경

 웹 사이트는 매우 빠르게 성장하여 특정 시간에 활발하게 활동한 사용자에게 혜택을 주는 이벤트를 한다고 가정해보자. 이 때, 다른 핵심 기능을 하는 어플리케이션에서 로그인한 시간을 부분적으로 사용해서 동작했기 때문에 사용자가 웹사이트에 로그인할 때, 여러 데이터베이스 서버에 있는 테이블들에 로그인 시간을 기록해야 했다.

 하지만 위 그림처럼 데이터베이스 업데이트가 차례대로 수행되면서 성능 문제가 급속히 증가했고, 하나의 업데이트 질의가 완료되는 데 평균 50ms 걸렸다면 열개의 어플리케이션을 가진 사용자가 로그

인한다면 마지막 로그인 시간을 업데이트하는 작업에만 0.5초를 추가해야 하고, 사용자의 어플리케이션 리스트의 데이터베이스 서버 중 하나가 느리게 응답하거나 응답하지 않으면 사용자는 더 이상 사이트에 로그인할 수 없는 문제가 발생할 수 있다.

 

 따라서 사용자와 직면한 로그인 요청에서 데이터베이스 업데이트 작업의 의존성을 분리할 필요성이 있다. 따라서 메시지 지향 미들웨어(MOM, Message-oriented-middleware)인 RabbitMQ를 사용하여 느슨하게 결합된 설계로 변경하기 위해 로그인 처리에서 데이터베이스 업데이트 작업을 분리하면 사용자는 로그인 처리 중에 데이터베이스의 업데이트를 기다리지 않아도 되므로 신속하게 로그인할 수 있게 된다.

 위 그림을 살펴보면 사용자가 로그인하면 RabbitMQ에 메시지가 발행되고 어플리케이션은 인증된 회원 페이지로 이동할 수 있다. RabbitMQ는 로그인 이벤트 메시지를 구독하고 있는 모든 소비자에게 발행하고, 각 소비자는 독립적으로 자신의 데이터베이스에 작업을 수행할 수 있다.

 

AMQP(Advanced Message Queueing Protocol)

 RabbitMQ는 AMQP를 기반으로 구현됐지만 MQTT, STOMP, XMPP 등 다양한 프로토콜도 제공한다. AMQP를 구현한 RabbitMQ는 유연한 메시지 라우팅, 메시지 내구성 설정, 데이터센터 간 통신 등 메시지 지향 아키텍처의 복잡한 요구 사항에 대해 플랫폼 독립적인 플랫폼이라고 할 수 있다.

 AMQP 스펙은 HTTP, SMTP와 같은 프로토콜과 달리 네트워크 프로토콜의 정의뿐 아니라 서버 측 서비스와 동작 방식도 정의하는데, AMQ 모델을 살펴보면 확인할 수 있다. AMQ 모델은 메시지 라우팅 동작을 정의하는 메시지 브로커의 세 가지 추상 컴포넌트를 다음과 같이 논리적으로 정의한다.

 

익스체인지(Exchange)

 익스페인치는 메시지 브로커에서 큐에 메시지를 전달하는 컴포넌트이다. 메시지에 적용할 라우팅 동작을 정의하는데, 이는 일반적으로 메시지를 보낼 때 함께 전달한 데이터 속성을 검사하거나 메시지에 포함된 속성을 이용해 처리한다. 익스체인지에서는 다음과 같은 네 가지 방식으로 큐에 메시지를 적재한다.

  • Direct: 라우팅 키에 따라 메시지를 특정 큐로 직접 전달한다.
  • Fanout: 모든 큐에 메시지를 전달한다.
  • Topic: 라우팅 패턴을 이용하여 메시지를 특정 큐로 전달한다.
  • Headers: 메시지의 헤더 정보를 이용하여 메시지를 전달한다.

 

큐(Queue)

 큐는 수신한 메시지를 저장하는 컴포넌트이다. 큐의 설정 정보에는 메시지를 메모리에만 보관하거나 소비자(Cosumer)에게 전달하기 전에 선입 선출(FIFO, First-In First-Out) 순서로 전달하기 전에 메시지를 디스크에 보관하는지가 저장되어 있다.

 

바인딩(Binding)

 바인딩은 익스체인지에서 전달된 메시지가 어떤 큐에 저장돼야 하는지 정의하는 컴포넌트이다. 즉, 익스체인지 중 특정 유형은 익스체인지에 지정한 특정 큐에만 메시지를 전달하도록 메시지를 필터랑하고, 익스체인지에 메시지를 발행할 때 어플리케이션은 라우팅 키(routing-key) 속성을 사용한다. 익스체인지는 적절한 메시지를 큐로 전달하기 위해 메시지의 라우팅 키를 바인딩 키에 맞춰서 평가한다.

 

프로토콜 사용하기

메세지를 큐에 발행하기 전에 몇 가지 설정 단계를 거쳐야 하는데, 최소한 익스체인지와 큐를 설정한 후 둘을 연결해야 한다.

 

익스체인지 선언하기

 

 AMQ 스펙에 익스체인지는 큐와 같이 해당 클래스가 존재한다. Exchange.Declare 명령에 익스체인지의 이름과 유형 그리고 메시지 처리에 사용하는 기타 메타데이터를 인수로 실행해서 익스체인지를 생성한다.

 Exchange.Declare 명령을 전송하면 RabbitMQ는 익스체인지를 생성한 후 Exchange.DeclareOk 메서드 프레임을 응답으로 전송한다. 만약 실패하면 Exchange.Declare가 실패하고 채널이 닫힌 이유를 나타내는 숫자 응답 코드와 텍스트 값을 Channel.Close 명령에 포함시켜 전송하고, Exchchange.Declare 명령이 전송된 채널을 닫는다.

 

큐 선언하기

 익스체인지를 생성한 후 RabbitMQ에 Queue.Declare 명령을 보내 큐를 생성한다. Queue.Declare 명령도 Exchange.Declare 명령과 유사한 통신 절차로 진행되며 Queue.Declare 명령이 실패하면 채널이 닫힌다.

 

큐와 익스체인지 연결하기

 익스체인지와 큐가 생성되면 이 둘을 연결해야 한다. Queue.Declare와 유사하게 큐를 익스체인지에 연결하는 명령인 Queue.Bind는 한 번에 하나의 큐만 지정한다. Exchange.Declare, Queue.Declare, Queue.Bind 명령은 RabbitMQ 서버와 클라이언트 간 RPC의 기본 명령으로 AMQP 스펙에서 다른 동기 방식의 명령도 이 공통 패턴을 따르고 있다.

 

RabbitMQ에 메시지 발행하기

 RabbitMQ에 메시지를 발행할 때 여러 종류의 프레임들이 서버로 전송하는 메시지의 데이터를 캡슐화한다. 실제 메시지 본문을 RabbitMQ에 전달하기 전에 클라이언트 어플리케이션은 Basic.Publish 메서드 프레임, 콘텐츠 헤더 프레임, 하나 이상의 바디 프레임을 전송한다.

 Basic.Publish 메서드 프레임에는 익스체인지의 이름과 라우팅 키가 들어있는데, 이를 RabbitMQ는 익스체인지의 이름을 저장하는 데이터베이스와 비교한다. 그리고 Basic.Properties 메서드 프레임의 익스체인지 이름과 일치하는 익스체인지를 발견한 후에 해당 익스체인지는 내부 바인딩들을 평가하며 라우팅 키와 일치하는 큐를 찾고 찾으면 선입선출 순서로 메시지를 큐에 삽입한다.

 RabbitMQ가 메시지를 전달할 준비가 되면 큐는 이 참조를 사용해 메시지를 마샬링함으로써 클라이언트에 전송한다. 여러 큐에 발행한 메시지는 실질적으로 이 참조를 이용하기 때문에 실제 메모리를 적게 사용하여 최적화된다. 만약 큐를 구독하는 소비자가 없어 메시지가 소비죄디 않으면 메시지를 큐에 저장되고 큐의 크기도 그에 따라 커진다. Basic.Properties에 지정된 배달 모드(delivery mode)에 따라 메시지를 메모리에 보관하거나 디스크에 기록한다.

 

RabbitMQ에서 메시지 소비하기

 


 RabbitMQ의 큐에서 메시지를 소비하기 위해 소비자 어플리케이셔은 Basic.Consume 명령을 실행해서 RabbitMQ의 큐를 구독한다. 다른 동기 방식 명령과 마찬가지로 서버는 Basic.ConsumeOk로 응답해 클라이언ㅌ가 연속해서 메시지를 받을 준비를 하도록 알린다. RabbitMQ의 판단에 따라, 소비자는 응답받기 적당한 형태인 Basic.Deliver 메서즈 프레임과 콘텐츠 헤더 프레임, 바디 프레임으로 메시지를 전달받는다.

 여기서 메시지 발행과 다른 점은 Basic.Consume이 발급되면 특정 상황이 발생하기 전끼지 활성 상태를 유지한다는 것이다. 소비자가 메시지 수신을 중지하려면 Basic.Cancel 명령을 발행해야 한다. RabbitMQ가 계속해서 메시지를 보내는 동안에 명령이 비동기적으로 실행된다.

Basic.Consume 명령의 no_ack 인수는 소비자의 수신 방식을 알수 있는 몇 가지 설정 중 하나다. no_ack를 true로 설정하면 RabbitMQ는 소비자가 Basic.Cancel 명령을 보내거나 연결을 끊을 때까지 계속 메시지를 보낸다. false로 설정하면 소비자는 Basic.Ack RPC 요청을 전송해 수신한 각 메시지를 확인해야 한다.

 

Java 작성하기

설치는 https://www.rabbitmq.com/docs/download에서 참고했고, RabbitMQ Java Client는 https://mvnrepository.com/artifact/com.rabbitmq/amqp-client에서 참고할 수 있다.

 

메시지 발행

public class Send {

    private final static String EXCHANGE_NAME = "logs";
    private final static String QUEUE_NAME = "hello";
    private final static String ROUTING_KEY = "hello";

    public static void main(String[] argv) throws Exception {

        // 연결할 url을 지정하는데 localhost의 기본 가상 호스트(/)에 포트 번호 5672로 연결한다.
        // 이 url은 기본 설정으로 로컬 시스템의 RabbitMQ에 연결된다.
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection(); // 설정한 URL을 사용하여 RabbitMQ에 연결
             Channel channel = connection.createChannel()) { // 연결 성공하면 통신할 수 있는 새로운 채널을 열어야 한다.

            // 채널을 통해 RabbitMQ 서버에 익스체인지(Exchange) 선언(Declare)
            // Exchange.Declare 명령 전송
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 채널을 통해 RabbitMQ 서버에 새로운 큐(Queue) 선언(Declare)
            // Queue.Declare 명령 전송
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // RabbitMQ 서버의 큐와 익스체인지를 연결하기
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            String message = "Hello World! %s";
            for (int i = 0; i < 10; i++) {
                // 특정 EXCHANGE_NAME에 특정 routingKey
                channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, String.format(message, i).getBytes(StandardCharsets.UTF_8));
            }
        }
    }
}

Connection과 Channel을 사용하면 자원을 닫아줘야 하기 때문에 try-with-resources 문을 사용하여 finally를 통해 명시적으로 닫을 필요 없이 컴파일될 때 누락없이 닫아주도록 코드를 작성했다. 그리고 코드를 자세히 살펴보자.

 

// RabbitMQ 서버의 큐와 익스체인지를 연결하기
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

큐가 생성되고, 메시지를 수신할 수 있도록 큐를 익스체인지에 연결해야 한다. 큐를 익스체인지에 연결하기 위해서는 Channel 클래스의 queueBind 메서드를 호출하고 익스체인지와 라우팅 키를 전달하면 RabbitMQ에 Queue.Bind 명령이 전송된다.

 

 

// 특정 EXCHANGE_NAME에 특정 routingKey
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, String.format(message, i).getBytes(StandardCharsets.UTF_8));

익스체인지와 큐를 만들고 이 둘을 연결하면 RabbitMQ에 hello 큐에 저장할 테스트 메시지를 발행했다. 메시지 본문, 메시지 속성(현재는 null) 등을 인자로 전달하여 basicPublish 메서드를 호출하면 Basic.Publish 메서드 프레임, 콘텐츠 헤더 프레임, 바디 프레임을 만들고 이를 RabbitMQ에 전송한다. 충분한 테스트를 위해 열 개의 테스트 메시지를 큐에 발행하면 아래와 같은 결과를 확인할 수 있다.

 

 

메시지 소비

public class Receive {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("localhost");

       Connection connection = factory.newConnection(); // RabbitMQ 서버와의 연결을 생성
       Channel channel = connection.createChannel(); // 메시지를 주고받을 채널 생성

       // 큐에 메시지를 수신하면 실행할 콜백을 정의
       DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
          System.out.println(" [x] Received '" + message + "'");
       };

       // 큐에서 메시지 소비
       // autoAck를 true로 설정하여 메시지를 자동으로 확인
       channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

 메시지와 발생했을 때의 차이는 try-with-resources를 사용하지 않았다는 것이다. 만약 try-with-resources를 사용하여 채널과 연결을 자동으로 닫지 않은 이유는 소비자가 비동기적으로 메시지를 수신하는 동안 프로그램이 계속 실행되어야 하기 때문이다. 그리고 메시지를 수신하면 이를 처리할 콜백 객체를 제공해야 하는데 DeliverCallback 클래스에서 정의할 수 있다.

 그리고 basicConsume 두 번째 인자로 autoAck을 설정할 수 있는데 이를 true로 설정하는 경우에는 메시지를 전달받는 컨슈머가 메시지에 대해 처리 여부와 상관없이 자동으로 메시지를 받았다고 ack를 리턴하기 때문에 큐에서 메시지를 제거된다. 이는 메시지가 즉시 소비되므로 처리 속도가 빨라질 수 있다는 장점이 있지만 메시지가 소비되었으나 처리 도중 예외가 발생하면, 메시지가 이미 확인되었기 때문에 해당 메시지는 재처리할 수 없어 메시지 손실로 이어질 수 있다.

 

// 큐에 메시지를 수신하면 실행할 콜백을 정의
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    System.out.println(" [x] Received '" + message + "'");

    // RabbitMQ에 메세지 수신 확인 전송
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

// 큐에서 메시지 소비
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });

 

따라서 메시지 손실이 발생하지 말아야 하는 환경에서라면 autoAck은 false로 설정하고 위 코드처럼 메시지에 대한 처리가 완료된 후 RabbitMQ에 메시지 수신 확인을 전송하는 것이 좋다.

 

메시지 수신 확인을 받은 RabbitMQ는 큐에서 메시지를 제거하여 위와 같은 결과를 얻을 수 있는 것을 확인할 수 있다.

 

 

 

출처

RabbitMQ 공식문서

RabbitMQ In Depth

메시지 큐 사용 이유와 RabbitMQ 주요 옵션, 재앙 시나리오 소개

RabbitMQ - 이해하기