Set virtual thread names for RabbitMQ and Pulsar

See gh-39958
This commit is contained in:
Maziz 2024-03-17 17:30:15 +08:00 committed by Moritz Halbritter
parent f1ccc944e5
commit ecda754116
4 changed files with 63 additions and 5 deletions

View File

@ -47,6 +47,16 @@ import org.springframework.core.task.VirtualThreadTaskExecutor;
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
/**
* Default Name of the thread created for simple rabbit listener.
*/
public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-";
/**
* Default Name of the thread created for direct rabbit listener.
*/
public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-";
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
@ -76,7 +86,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE));
return configurer;
}
@ -105,7 +115,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor());
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT));
return configurer;
}

View File

@ -73,6 +73,16 @@ import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {
/**
* Default Name of the thread created for pulsar consumer.
*/
public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-";
/**
* Default Name of the thread created for pulsar task executor.
*/
public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-";
private PulsarProperties properties;
private PulsarPropertiesMapper propertiesMapper;
@ -158,7 +168,7 @@ public class PulsarAutoConfiguration {
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor());
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
@ -189,7 +199,8 @@ public class PulsarAutoConfiguration {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor());
readerContainerProperties
.setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.amqp;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLSocketFactory;
@ -545,12 +546,36 @@ class RabbitAutoConfigurationTests {
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreads() {
void shouldConfigureVirtualThreadsForSimpleListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*");
});
}
@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void shouldConfigureVirtualThreadsForDirectListener() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
DirectRabbitListenerContainerFactoryConfigurer rabbitListenerContainerFactory = context.getBean(
"directRabbitListenerContainerFactoryConfigurer",
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*");
});
}

View File

@ -18,6 +18,7 @@ package org.springframework.boot.autoconfigure.pulsar;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -69,6 +70,7 @@ import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.test.util.ReflectionTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -506,6 +508,11 @@ class PulsarAutoConfigurationTests {
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*");
});
}
@ -561,6 +568,11 @@ class PulsarAutoConfigurationTests {
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*");
});
}