From 02c49b02870f15a9ed58f0707fea83b3b01841bf Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Tue, 17 Oct 2023 13:31:33 +0100 Subject: [PATCH] When virtual threads are enabled, configure Pulsar to use them Closes gh-36347 --- .../pulsar/PulsarAutoConfiguration.java | 13 +++++- .../pulsar/PulsarAutoConfigurationTests.java | 45 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 9ed6ae3b09e..c60565b5918 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -31,10 +31,13 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.thread.Threading; import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.core.env.Environment; +import org.springframework.core.task.VirtualThreadTaskExecutor; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; @@ -149,10 +152,13 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(name = "pulsarListenerContainerFactory") ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( PulsarConsumerFactory pulsarConsumerFactory, SchemaResolver schemaResolver, - TopicResolver topicResolver) { + TopicResolver topicResolver, Environment environment) { PulsarContainerProperties containerProperties = new PulsarContainerProperties(); containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); + if (Threading.VIRTUAL.isActive(environment)) { + containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor()); + } this.propertiesMapper.customizeContainerProperties(containerProperties); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); } @@ -178,9 +184,12 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean(name = "pulsarReaderContainerFactory") DefaultPulsarReaderContainerFactory pulsarReaderContainerFactory(PulsarReaderFactory pulsarReaderFactory, - SchemaResolver schemaResolver) { + SchemaResolver schemaResolver, Environment environment) { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); + if (Threading.VIRTUAL.isActive(environment)) { + readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor()); + } this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 16e66c9a473..7e56f4129ea 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -29,6 +29,8 @@ import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.common.schema.SchemaType; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledForJreRange; +import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -39,6 +41,7 @@ import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.annotation.Order; +import org.springframework.core.task.VirtualThreadTaskExecutor; import org.springframework.pulsar.annotation.PulsarBootstrapConfiguration; import org.springframework.pulsar.annotation.PulsarListenerAnnotationBeanPostProcessor; import org.springframework.pulsar.annotation.PulsarReaderAnnotationBeanPostProcessor; @@ -464,6 +467,27 @@ class PulsarAutoConfigurationTests { .hasFieldOrPropertyWithValue("containerProperties.observationEnabled", false)); } + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void whenVirtualThreadsAreEnabledOnJava21AndLaterListenerContainerShouldUseVirtualThreads() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + ConcurrentPulsarListenerContainerFactory factory = context + .getBean(ConcurrentPulsarListenerContainerFactory.class); + assertThat(factory.getContainerProperties().getConsumerTaskExecutor()) + .isInstanceOf(VirtualThreadTaskExecutor.class); + }); + } + + @Test + @EnabledForJreRange(max = JRE.JAVA_20) + void whenVirtualThreadsAreEnabledOnJava20AndEarlierListenerContainerShouldNotUseVirtualThreads() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + ConcurrentPulsarListenerContainerFactory factory = context + .getBean(ConcurrentPulsarListenerContainerFactory.class); + assertThat(factory.getContainerProperties().getConsumerTaskExecutor()).isNull(); + }); + } + } @Nested @@ -498,6 +522,27 @@ class PulsarAutoConfigurationTests { }); } + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void whenVirtualThreadsAreEnabledOnJava21AndLaterReaderShouldUseVirtualThreads() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + DefaultPulsarReaderContainerFactory factory = context + .getBean(DefaultPulsarReaderContainerFactory.class); + assertThat(factory.getContainerProperties().getReaderTaskExecutor()) + .isInstanceOf(VirtualThreadTaskExecutor.class); + }); + } + + @Test + @EnabledForJreRange(max = JRE.JAVA_20) + void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThreads() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + DefaultPulsarReaderContainerFactory factory = context + .getBean(DefaultPulsarReaderContainerFactory.class); + assertThat(factory.getContainerProperties().getReaderTaskExecutor()).isNull(); + }); + } + @TestConfiguration(proxyBeanMethods = false) static class ReaderBuilderCustomizersConfig {