ecsimsw

Rabbit MQ 재시도 정책과 Dead letter queue 설정 방법 본문

Rabbit MQ 재시도 정책과 Dead letter queue 설정 방법

JinHwan Kim 2023. 10. 27. 02:39

Rabbit MQ 재시도 정책과 Dead letter queue 설정 방법

Pic up 에서 서버간 비동기 통신을 위해 그리고 두 서버의 직접 통신을 피하기 위해 Message queue 로 Rabbit MQ 를 사용하고 있다. Message consumer 가 메시지를 처리하는 도중에 처리 실패하는 경우 큐에서 메시지는 제거되지만 실제 메시지는 처리가 안되는 문제를 해결하기 위한 정책을 고민한다.

 

재시도 정책

Consumer 가 메시지 처리 도중 예외가 발생하는 경우 정책에 따라 Message queue 에 다시 해당 메시지를 추가하도록 한다. RabbitListenerContainerFactory 를 재정의하고 Rabbit listener 가 이를 사용하는 것으로 RetryInterceptorBuilder, PrefetchCount 를 포함한 구체적인 Listener 설정이 가능하다.

 

아래는 파일 제거 메시지를 위한 큐 (fileDeletionQueue)의 container factory 를 정의하는 설정이다.

 

@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> fileDeletionQueueContainerFactory(ConnectionFactory connectionFactory) {
    var factory = new SimpleRabbitListenerContainerFactory();
    factory.setPrefetchCount(PREFETCH);
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(RetryInterceptorBuilder.stateless()
        .maxAttempts(MAX_ATTEMPS)
        .backOffOptions(INITIAL_INTERVAL, MULTIPLIER, MAX_INTERVAL)
        .recoverer(new RejectAndDontRequeueRecoverer())
        .build());
    return factory;
}

 

RetryInterceptorBuilder : 재시도 정책을 설정한다.  

- Max attempts = 재시도 최대 횟수 

- Initial interval = 최초 재시도 간격 시간

- Multiplier = 재시도 시간 간격 증가 곱

- Max interval = 최대 재시도 간격 시간

 

만약 (Max attempts = 5, Initial interval = 1, Multiplier = 3, Max interval = 10) 인 Listener factory 를 사용하는 Rabbit Listener 가 메시지 처리 도중 실패했다면, 최초 1초의 간격 후에 재시도 후 그 간격을 3배수 씩 높여가며 최대 10초의 시간 간격으로 재시도를 수행하게 되고, 재시도는 최대 5회 실시하게 된다.

 

Recover 정책과 Dead letter queue

최대 재시도 횟수를 넘은 메시지를 관리하고 싶다. 재시도 후에도 여전히 처리가 되지 않은 메시지는 처리 로직 외 처리 실패 메시지를 관리할 수 있는 로직이 수행되었으면 한다.     

 

 

예를 들어 해당 메시지 정보와 시도 시각을 로깅하고 수동 처리를 유도하는 로직이 수행될 수 있을 것 같다. 이런 재시도까지도 마쳤는데 처리되지 못한 메시지를 Dead letter 라고 한다. 이를 관리하기 위한 큐를 설정하였다.

 

 

1. Dead letter exchange 정의

 

우선 Dead letter 처리를 라우팅할 exchange 를 생성한다. 

 

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}

 

 

2. Message 헤더에 dead letter routing 정보 삽입

 

처리 큐에 재시도 끝에도 처리되지 않은 메시지가 라우팅될 deadLetterExchange 를 설정한다. 그리고 이 큐의 메시지 헤더에 "x-dead-letter-exchange", "x-dead-letter-routing-key" 를 포함하는 것으로 dead letter 처리 시 어떤 exchange 에서 어떤 라우팅 키로 라우팅 될 것인지에 대한 정보를 포함시킨다. 

@Bean
public Queue fileDeletionQueue() {
    return QueueBuilder.durable(QUEUE_NAME)
        .deadLetterExchange(DEAD_LETTER_EXCHANGE)
        .withArguments(Map.of(
            "x-dead-letter-exchange", DEAD_LETTER_EXCHANGE,
            "x-dead-letter-routing-key", DEAD_LETTER_QUEUE_KEY
        ))
        .build();
}

 

 

3. deadLetterExchange 와 recover 큐, 라우팅 키 바인딩

 

마지막으로 deadLetterExchange 를 통해 라우팅되어 실제 deadLetter 가 처리될 큐와 라우팅 키를 정의할 Binding 을 생성한다.

 

@Bean
public Queue fileDeletionRecoverQueue() {
    return QueueBuilder.durable(RECOVER_QUEUE)
        .build();
}

@Bean
public Binding fileDeletionRecoverQueueBinding() {
    return BindingBuilder
        .bind(RECOVER_QUEUE)
        .to(DEAD_LETTER_EXCHANGE)
        .with(RECOVER_QUEUE_KEY);
}

 

그리고 이 dead letter queue 의 listener 를 선언하는 것으로 재시도 끝에도 처리되지 못한 메시지를 처리할 로직을 선언 할 수 있다.

 

@RabbitListener(queues = "${mq.file.deletion.recover.queue.name}")
public void deleteAllRecover(Message failedMessage) {
    LOGGER.error("dead letter from file deletion queue \n" +
        "body : " + failedMessage.getPayload());
}

 

Consumer server down / ACK timeout

만약 Consumer가 message를 가져가긴 했으나 중간에 서버가 아예 다운되어서 메시지가 유실 되는 상황을 걱정하진 않아도 된다. 기본적으로 Consumer는 처리 완료되는 경우 ACK 를 MQ 에 응답해 처리가 제대로 되었음을 확인시킨다. 만약 이 ACK가 오지 않는 경우 MQ는 해당 메시지가 처리되지 않았음을 알 수 있다.

 

그럼 ACK를 언제까지 기다릴지도 설정해야할 것이다. Consumer application이 제대로 종료되지 않아 ACK 또는 NACK로 MQ에 처리 결과를 알려주지 않을 경우 MQ는 무한정 이 Consumer를 기다릴 수만은 없다. 이는 rabbitMQ config 에서 consumer_timeout 을 키로 설정할 수 있다. 다른 설정이 없을 경우 기본적으론 30분이 Default 값이다.

 

 

 

Comments