Add RabbitMQ properties to enable observations

Observations can be enabled for the simple, direct and stream listener
and on the RabbitTemplate.

Closes gh-36451
This commit is contained in:
Moritz Halbritter 2023-12-12 14:55:50 +01:00
parent 93a2b1cda0
commit a7d88b69d4
6 changed files with 60 additions and 3 deletions

View File

@ -134,6 +134,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer<T extends
if (this.taskExecutor != null) {
factory.setTaskExecutor(this.taskExecutor);
}
factory.setObservationEnabled(configuration.isObservationEnabled());
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()

View File

@ -723,6 +723,11 @@ public class RabbitProperties {
*/
private boolean autoStartup = true;
/**
* Whether to enable observation.
*/
private boolean observationEnabled;
public boolean isAutoStartup() {
return this.autoStartup;
}
@ -731,6 +736,14 @@ public class RabbitProperties {
this.autoStartup = autoStartup;
}
public boolean isObservationEnabled() {
return this.observationEnabled;
}
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}
}
public abstract static class AmqpContainer extends BaseContainer {
@ -996,6 +1009,11 @@ public class RabbitProperties {
*/
private String defaultReceiveQueue;
/**
* Whether to enable observation.
*/
private boolean observationEnabled;
public Retry getRetry() {
return this.retry;
}
@ -1048,6 +1066,14 @@ public class RabbitProperties {
this.defaultReceiveQueue = defaultReceiveQueue;
}
public boolean isObservationEnabled() {
return this.observationEnabled;
}
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}
}
public static class Retry {

View File

@ -25,6 +25,7 @@ import com.rabbitmq.stream.EnvironmentBuilder;
import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties.StreamContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -57,7 +58,9 @@ class RabbitStreamConfiguration {
ObjectProvider<ContainerCustomizer<StreamListenerContainer>> containerCustomizer) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(
rabbitStreamEnvironment);
factory.setNativeListener(properties.getListener().getStream().isNativeListener());
StreamContainer stream = properties.getListener().getStream();
factory.setObservationEnabled(stream.isObservationEnabled());
factory.setNativeListener(stream.isNativeListener());
consumerCustomizer.ifUnique(factory::setConsumerCustomizer);
containerCustomizer.ifUnique(factory::setContainerCustomizer);
return factory;

View File

@ -101,6 +101,7 @@ public class RabbitTemplateConfigurer {
map.from(templateProperties::getExchange).to(template::setExchange);
map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
map.from(templateProperties::isObservationEnabled).to(template::setObservationEnabled);
}
private boolean determineMandatoryFlag() {

View File

@ -29,6 +29,7 @@ import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
import com.rabbitmq.client.impl.DefaultCredentialsProvider;
import org.aopalliance.aop.Advice;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
@ -371,6 +372,16 @@ class RabbitAutoConfigurationTests {
});
}
@Test
void shouldConfigureObservationEnabledOnTemplate() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.rabbitmq.template.observation-enabled:true")
.run((context) -> {
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
assertThat(rabbitTemplate).extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN).isTrue();
});
}
@Test
void testRabbitTemplateDefaultReceiveQueue() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
@ -531,7 +542,8 @@ class RabbitAutoConfigurationTests {
"spring.rabbitmq.listener.simple.idleEventInterval:5",
"spring.rabbitmq.listener.simple.batchSize:20",
"spring.rabbitmq.listener.simple.missingQueuesFatal:false",
"spring.rabbitmq.listener.simple.force-stop:true")
"spring.rabbitmq.listener.simple.force-stop:true",
"spring.rabbitmq.listener.simple.observation-enabled:true")
.run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
@ -539,6 +551,7 @@ class RabbitAutoConfigurationTests {
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers", 10);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true);
checkCommonProps(context, rabbitListenerContainerFactory);
});
}
@ -582,12 +595,14 @@ class RabbitAutoConfigurationTests {
"spring.rabbitmq.listener.direct.defaultRequeueRejected:false",
"spring.rabbitmq.listener.direct.idleEventInterval:5",
"spring.rabbitmq.listener.direct.missingQueuesFatal:true",
"spring.rabbitmq.listener.direct.force-stop:true")
"spring.rabbitmq.listener.direct.force-stop:true",
"spring.rabbitmq.listener.direct.observation-enabled:true")
.run((context) -> {
DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("consumersPerQueue", 5);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", true);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true);
checkCommonProps(context, rabbitListenerContainerFactory);
});
}

View File

@ -54,6 +54,7 @@ import static org.mockito.Mockito.mock;
* @author Gary Russell
* @author Andy Wilkinson
* @author Eddú Meléndez
* @author Moritz Halbritter
*/
class RabbitStreamConfigurationTests {
@ -88,6 +89,16 @@ class RabbitStreamConfigurationTests {
.isTrue());
}
@Test
void shouldConfigureObservations() {
this.contextRunner
.withPropertyValues("spring.rabbitmq.listener.type:stream",
"spring.rabbitmq.listener.stream.observation-enabled:true")
.run((context) -> assertThat(context.getBean(StreamRabbitListenerContainerFactory.class))
.extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN)
.isTrue());
}
@Test
void environmentIsAutoConfiguredByDefault() {
this.contextRunner.run((context) -> assertThat(context).hasSingleBean(Environment.class));