From a7d88b69d422bd0f8c657007cfcfc5bde3c42332 Mon Sep 17 00:00:00 2001 From: Moritz Halbritter Date: Tue, 12 Dec 2023 14:55:50 +0100 Subject: [PATCH] Add RabbitMQ properties to enable observations Observations can be enabled for the simple, direct and stream listener and on the RabbitTemplate. Closes gh-36451 --- ...bitListenerContainerFactoryConfigurer.java | 1 + .../autoconfigure/amqp/RabbitProperties.java | 26 +++++++++++++++++++ .../amqp/RabbitStreamConfiguration.java | 5 +++- .../amqp/RabbitTemplateConfigurer.java | 1 + .../amqp/RabbitAutoConfigurationTests.java | 19 ++++++++++++-- .../amqp/RabbitStreamConfigurationTests.java | 11 ++++++++ 6 files changed, 60 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java index bcfe04a6748..c60f98999e6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/AbstractRabbitListenerContainerFactoryConfigurer.java @@ -134,6 +134,7 @@ public abstract class AbstractRabbitListenerContainerFactoryConfigurer builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless() diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java index 4279fb30df0..4c0081ec70a 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java @@ -723,6 +723,11 @@ public class RabbitProperties { */ private boolean autoStartup = true; + /** + * Whether to enable observation. + */ + private boolean observationEnabled; + public boolean isAutoStartup() { return this.autoStartup; } @@ -731,6 +736,14 @@ public class RabbitProperties { this.autoStartup = autoStartup; } + public boolean isObservationEnabled() { + return this.observationEnabled; + } + + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + } public abstract static class AmqpContainer extends BaseContainer { @@ -996,6 +1009,11 @@ public class RabbitProperties { */ private String defaultReceiveQueue; + /** + * Whether to enable observation. + */ + private boolean observationEnabled; + public Retry getRetry() { return this.retry; } @@ -1048,6 +1066,14 @@ public class RabbitProperties { this.defaultReceiveQueue = defaultReceiveQueue; } + public boolean isObservationEnabled() { + return this.observationEnabled; + } + + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + } public static class Retry { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java index 569cdb2bf66..a94911e94e1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfiguration.java @@ -25,6 +25,7 @@ import com.rabbitmq.stream.EnvironmentBuilder; import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties.StreamContainer; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -57,7 +58,9 @@ class RabbitStreamConfiguration { ObjectProvider> containerCustomizer) { StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory( rabbitStreamEnvironment); - factory.setNativeListener(properties.getListener().getStream().isNativeListener()); + StreamContainer stream = properties.getListener().getStream(); + factory.setObservationEnabled(stream.isObservationEnabled()); + factory.setNativeListener(stream.isNativeListener()); consumerCustomizer.ifUnique(factory::setConsumerCustomizer); containerCustomizer.ifUnique(factory::setContainerCustomizer); return factory; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitTemplateConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitTemplateConfigurer.java index 6d20e66c7b4..6ade448ebb3 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitTemplateConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitTemplateConfigurer.java @@ -101,6 +101,7 @@ public class RabbitTemplateConfigurer { map.from(templateProperties::getExchange).to(template::setExchange); map.from(templateProperties::getRoutingKey).to(template::setRoutingKey); map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); + map.from(templateProperties::isObservationEnabled).to(template::setObservationEnabled); } private boolean determineMandatoryFlag() { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index de41da0fe10..64e6a0f26be 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -29,6 +29,7 @@ import com.rabbitmq.client.impl.CredentialsProvider; import com.rabbitmq.client.impl.CredentialsRefreshService; import com.rabbitmq.client.impl.DefaultCredentialsProvider; import org.aopalliance.aop.Advice; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.JRE; @@ -371,6 +372,16 @@ class RabbitAutoConfigurationTests { }); } + @Test + void shouldConfigureObservationEnabledOnTemplate() { + this.contextRunner.withUserConfiguration(TestConfiguration.class) + .withPropertyValues("spring.rabbitmq.template.observation-enabled:true") + .run((context) -> { + RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); + assertThat(rabbitTemplate).extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN).isTrue(); + }); + } + @Test void testRabbitTemplateDefaultReceiveQueue() { this.contextRunner.withUserConfiguration(TestConfiguration.class) @@ -531,7 +542,8 @@ class RabbitAutoConfigurationTests { "spring.rabbitmq.listener.simple.idleEventInterval:5", "spring.rabbitmq.listener.simple.batchSize:20", "spring.rabbitmq.listener.simple.missingQueuesFatal:false", - "spring.rabbitmq.listener.simple.force-stop:true") + "spring.rabbitmq.listener.simple.force-stop:true", + "spring.rabbitmq.listener.simple.observation-enabled:true") .run((context) -> { SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); @@ -539,6 +551,7 @@ class RabbitAutoConfigurationTests { assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers", 10); assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20); assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false); + assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true); checkCommonProps(context, rabbitListenerContainerFactory); }); } @@ -582,12 +595,14 @@ class RabbitAutoConfigurationTests { "spring.rabbitmq.listener.direct.defaultRequeueRejected:false", "spring.rabbitmq.listener.direct.idleEventInterval:5", "spring.rabbitmq.listener.direct.missingQueuesFatal:true", - "spring.rabbitmq.listener.direct.force-stop:true") + "spring.rabbitmq.listener.direct.force-stop:true", + "spring.rabbitmq.listener.direct.observation-enabled:true") .run((context) -> { DirectRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", DirectRabbitListenerContainerFactory.class); assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("consumersPerQueue", 5); assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", true); + assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("observationEnabled", true); checkCommonProps(context, rabbitListenerContainerFactory); }); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java index 95549628d1e..205abcbe0b1 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitStreamConfigurationTests.java @@ -54,6 +54,7 @@ import static org.mockito.Mockito.mock; * @author Gary Russell * @author Andy Wilkinson * @author EddĂș MelĂ©ndez + * @author Moritz Halbritter */ class RabbitStreamConfigurationTests { @@ -88,6 +89,16 @@ class RabbitStreamConfigurationTests { .isTrue()); } + @Test + void shouldConfigureObservations() { + this.contextRunner + .withPropertyValues("spring.rabbitmq.listener.type:stream", + "spring.rabbitmq.listener.stream.observation-enabled:true") + .run((context) -> assertThat(context.getBean(StreamRabbitListenerContainerFactory.class)) + .extracting("observationEnabled", InstanceOfAssertFactories.BOOLEAN) + .isTrue()); + } + @Test void environmentIsAutoConfiguredByDefault() { this.contextRunner.run((context) -> assertThat(context).hasSingleBean(Environment.class));