Merge branch '3.2.x'

Closes gh-39961
This commit is contained in:
Moritz Halbritter 2024-03-18 11:41:50 +01:00
commit 5c2f64d677
4 changed files with 40 additions and 7 deletions

View File

@ -76,7 +76,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-simple-"));
return configurer;
}
@ -105,7 +105,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-direct-"));
return configurer;
}

View File

@ -73,9 +73,9 @@ import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {
private PulsarProperties properties;
private final PulsarProperties properties;
private PulsarPropertiesMapper propertiesMapper;
private final PulsarPropertiesMapper propertiesMapper;
PulsarAutoConfiguration(PulsarProperties properties) {
this.properties = properties;
@ -158,7 +158,7 @@ public class PulsarAutoConfiguration {
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
@ -189,7 +189,7 @@ public class PulsarAutoConfiguration {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.amqp;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSocketFactory;
@ -558,12 +559,34 @@ class RabbitAutoConfigurationTests {
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreads() {
void shouldConfigureVirtualThreadsForSimpleListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-simple-[0-9]+");
});
}
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreadsForDirectListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean(
"directRabbitListenerContainerFactoryConfigurer",
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-direct-[0-9]+");
});
}

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.pulsar;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -69,6 +70,7 @@ import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -506,6 +508,10 @@ class PulsarAutoConfigurationTests {
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
Object taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-consumer-[0-9]+");
});
}
@ -561,6 +567,10 @@ class PulsarAutoConfigurationTests {
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
Object taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-reader-[0-9]+");
});
}