diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java index 0a5331e144b..f64a820aecd 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.java @@ -76,7 +76,7 @@ class RabbitAnnotationDrivenConfiguration { @ConditionalOnThreading(Threading.VIRTUAL) SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() { SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer(); - configurer.setTaskExecutor(new VirtualThreadTaskExecutor()); + configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-simple-")); return configurer; } @@ -105,7 +105,7 @@ class RabbitAnnotationDrivenConfiguration { @ConditionalOnThreading(Threading.VIRTUAL) DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() { DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer(); - configurer.setTaskExecutor(new VirtualThreadTaskExecutor()); + configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-direct-")); return configurer; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 4a0483f58ca..55fb759a86f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -73,9 +73,9 @@ import org.springframework.pulsar.reader.PulsarReaderContainerProperties; @Import(PulsarConfiguration.class) public class PulsarAutoConfiguration { - private PulsarProperties properties; + private final PulsarProperties properties; - private PulsarPropertiesMapper propertiesMapper; + private final PulsarPropertiesMapper propertiesMapper; PulsarAutoConfiguration(PulsarProperties properties) { this.properties = properties; @@ -158,7 +158,7 @@ public class PulsarAutoConfiguration { containerProperties.setSchemaResolver(schemaResolver); containerProperties.setTopicResolver(topicResolver); if (Threading.VIRTUAL.isActive(environment)) { - containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor()); + containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-")); } this.propertiesMapper.customizeContainerProperties(containerProperties); return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties); @@ -189,7 +189,7 @@ public class PulsarAutoConfiguration { PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties(); readerContainerProperties.setSchemaResolver(schemaResolver); if (Threading.VIRTUAL.isActive(environment)) { - readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor()); + readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-")); } this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties); return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java index 072058f02e2..05f87ad1a67 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.amqp; import java.security.NoSuchAlgorithmException; import java.util.List; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLSocketFactory; @@ -558,12 +559,34 @@ class RabbitAutoConfigurationTests { @Test @EnabledForJreRange(min = JRE.JAVA_21) - void shouldConfigureVirtualThreads() { + void shouldConfigureVirtualThreadsForSimpleListener() { this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context .getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class); assertThat(rabbitListenerContainerFactory).extracting("taskExecutor") .isInstanceOf(VirtualThreadTaskExecutor.class); + Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()).containsPattern("rabbit-simple-[0-9]+"); + + }); + } + + @Test + @EnabledForJreRange(min = JRE.JAVA_21) + void shouldConfigureVirtualThreadsForDirectListener() { + this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> { + DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean( + "directRabbitListenerContainerFactoryConfigurer", + DirectRabbitListenerContainerFactoryConfigurer.class); + assertThat(rabbitListenerContainerFactory).extracting("taskExecutor") + .isInstanceOf(VirtualThreadTaskExecutor.class); + Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor"); + Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()).containsPattern("rabbit-direct-[0-9]+"); + }); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 3df54e006fd..995f50c720e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.pulsar; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import com.github.benmanes.caffeine.cache.Caffeine; @@ -69,6 +70,7 @@ import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -506,6 +508,10 @@ class PulsarAutoConfigurationTests { .getBean(ConcurrentPulsarListenerContainerFactory.class); assertThat(factory.getContainerProperties().getConsumerTaskExecutor()) .isInstanceOf(VirtualThreadTaskExecutor.class); + Object taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor(); + Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()).containsPattern("pulsar-consumer-[0-9]+"); }); } @@ -561,6 +567,10 @@ class PulsarAutoConfigurationTests { .getBean(DefaultPulsarReaderContainerFactory.class); assertThat(factory.getContainerProperties().getReaderTaskExecutor()) .isInstanceOf(VirtualThreadTaskExecutor.class); + Object taskExecutor = factory.getContainerProperties().getReaderTaskExecutor(); + Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory"); + Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class)); + assertThat(threadCreated.getName()).containsPattern("pulsar-reader-[0-9]+"); }); }