Allow to define a custom MessageRecoverer

This commit improves `SimpleRabbitListenerContainerFactoryConfigurer` to
use a custom `MessageConverter`. If such a bean is present, it is used
for the default factory that is auto-configured.

Closes gh-8194
This commit is contained in:
Stephane Nicoll 2017-02-09 10:43:03 +01:00
parent b931f564e6
commit aa49468171
4 changed files with 63 additions and 13 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 the original author or authors.
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@ -40,11 +41,15 @@ class RabbitAnnotationDrivenConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
private final RabbitProperties properties;
RabbitAnnotationDrivenConfiguration(ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<MessageRecoverer> messageRecoverer,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.messageRecoverer = messageRecoverer;
this.properties = properties;
}
@ -53,6 +58,7 @@ class RabbitAnnotationDrivenConfiguration {
public SimpleRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 the original author or authors.
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,6 +20,7 @@ import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.ListenerRetry;
@ -36,6 +37,8 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
private MessageConverter messageConverter;
private MessageRecoverer messageRecoverer;
private RabbitProperties rabbitProperties;
/**
@ -47,6 +50,14 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
this.messageConverter = messageConverter;
}
/**
* Set the {@link MessageRecoverer} to use or {@code null} to rely on the default.
* @param messageRecoverer the {@link MessageRecoverer}
*/
void setMessageRecoverer(MessageRecoverer messageRecoverer) {
this.messageRecoverer = messageRecoverer;
}
/**
* Set the {@link RabbitProperties} to use.
* @param rabbitProperties the {@link RabbitProperties}
@ -101,7 +112,9 @@ public final class SimpleRabbitListenerContainerFactoryConfigurer {
builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
builder.recoverer(new RejectAndDontRequeueRecoverer());
MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 the original author or authors.
* Copyright 2012-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,6 +28,7 @@ import org.junit.rules.ExpectedException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
@ -38,6 +39,7 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.DirectFieldAccessor;
@ -48,6 +50,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.MethodInvocationRecoverer;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@ -291,7 +294,8 @@ public class RabbitAutoConfigurationTests {
@Test
public void testRabbitListenerContainerFactoryWithCustomSettings() {
load(MessageConvertersConfiguration.class,
load(new Class<?>[] { MessageConvertersConfiguration.class,
MessageRecoverersConfiguration.class },
"spring.rabbitmq.listener.retry.enabled:true",
"spring.rabbitmq.listener.retry.maxAttempts:4",
"spring.rabbitmq.listener.retry.initialInterval:2000",
@ -325,6 +329,14 @@ public class RabbitAutoConfigurationTests {
assertThat(adviceChain).isNotNull();
assertThat(adviceChain.length).isEqualTo(1);
dfa = new DirectFieldAccessor(adviceChain[0]);
MessageRecoverer messageRecoverer = this.context.getBean("myMessageRecoverer",
MessageRecoverer.class);
MethodInvocationRecoverer mir = (MethodInvocationRecoverer) dfa
.getPropertyValue("recoverer");
Message message = mock(Message.class);
Exception ex = new Exception("test");
mir.recover(new Object[]{"foo", message}, ex);
verify(messageRecoverer).recover(message, ex);
RetryTemplate retryTemplate = (RetryTemplate) dfa
.getPropertyValue("retryOperations");
assertThat(retryTemplate).isNotNull();
@ -400,17 +412,17 @@ public class RabbitAutoConfigurationTests {
}
private void load(Class<?> config, String... environment) {
this.context = doLoad(new Class<?>[] { config }, environment);
load(new Class<?>[] { config }, environment);
}
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs,
private void load(Class<?>[] configs,
String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(configs);
applicationContext.register(RabbitAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.refresh();
return applicationContext;
this.context = applicationContext;
}
@Configuration
@ -484,6 +496,22 @@ public class RabbitAutoConfigurationTests {
}
@Configuration
protected static class MessageRecoverersConfiguration {
@Bean
@Primary
public MessageRecoverer myMessageRecoverer() {
return mock(MessageRecoverer.class);
}
@Bean
public MessageRecoverer anotherMessageRecoverer() {
return mock(MessageRecoverer.class);
}
}
@Configuration
@EnableRabbit
protected static class EnableRabbitConfiguration {

View File

@ -4635,8 +4635,9 @@ the broker connection is lost. Retries are disabled by default.
==== Receiving a message
When the Rabbit infrastructure is present, any bean can be annotated with
`@RabbitListener` to create a listener endpoint. If no `RabbitListenerContainerFactory`
has been defined, a default one is configured automatically. If a `MessageConverter`
beans is defined, it is associated automatically to the default factory.
has been defined, a default one is configured automatically. If a `MessageConverter` or
`MessageRecoverer` beans are defined, they are associated automatically to the default
factory.
The following component creates a listener endpoint on the `someQueue` queue:
@ -4699,9 +4700,11 @@ Then you can use in any `@RabbitListener`-annotated method as follows:
}
----
You can enable retries to handle situations where your listener throws an exception.
When retries are exhausted, the message will be rejected and either dropped or routed to a
dead-letter exchange if the broker is configured so. Retries are disabled by default.
You can enable retries to handle situations where your listener throws an exception. By
default `RejectAndDontRequeueRecoverer` is used but you can define a `MessageRecoverer`
of your own. When retries are exhausted, the message will be rejected and either dropped
or routed to a dead-letter exchange if the broker is configured so. Retries are disabled
by default.
IMPORTANT: If retries are not enabled and the listener throws an exception, by default the
delivery will be retried indefinitely. You can modify this behavior in two ways; set the