Start building against Spring Kafka 3.0.0-M3 snapshots

See gh-30089
This commit is contained in:
Stephane Nicoll 2022-03-07 20:40:00 +01:00
parent 0bf1090e29
commit e6f6b2068b
5 changed files with 7 additions and 85 deletions

View File

@ -24,11 +24,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.MessageConverter;
@ -55,10 +53,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private ConsumerAwareRebalanceListener rebalanceListener;
private ErrorHandler errorHandler;
private BatchErrorHandler batchErrorHandler;
private CommonErrorHandler commonErrorHandler;
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
@ -114,22 +108,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.rebalanceListener = rebalanceListener;
}
/**
* Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler
*/
void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
/**
* Set the {@link BatchErrorHandler} to use.
* @param batchErrorHandler the error handler
*/
void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
this.batchErrorHandler = batchErrorHandler;
}
/**
* Set the {@link CommonErrorHandler} to use.
* @param commonErrorHandler the error handler.
@ -178,10 +156,6 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.replyTemplate).to(factory::setReplyTemplate);
if (properties.getType().equals(Listener.Type.BATCH)) {
factory.setBatchListener(true);
factory.setBatchErrorHandler(this.batchErrorHandler);
}
else {
factory.setErrorHandler(this.errorHandler);
}
map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler);
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,10 +29,8 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.support.converter.BatchMessageConverter;
@ -65,10 +63,6 @@ class KafkaAnnotationDrivenConfiguration {
private final ConsumerAwareRebalanceListener rebalanceListener;
private final ErrorHandler errorHandler;
private final BatchErrorHandler batchErrorHandler;
private final CommonErrorHandler commonErrorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
@ -81,8 +75,8 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<CommonErrorHandler> commonErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
this.properties = properties;
@ -93,8 +87,6 @@ class KafkaAnnotationDrivenConfiguration {
this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.commonErrorHandler = commonErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
this.recordInterceptor = recordInterceptor.getIfUnique();
@ -112,8 +104,6 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setCommonErrorHandler(this.commonErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);

View File

@ -57,12 +57,10 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@ -511,46 +509,6 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class))
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", context.getBean("errorHandler"));
});
}
@Test
void concurrentKafkaListenerContainerFactoryInBatchModeShouldUseBatchErrorHandler() {
this.contextRunner.withBean("batchErrorHandler", BatchErrorHandler.class, () -> mock(BatchErrorHandler.class))
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler",
context.getBean("batchErrorHandler"));
});
}
@Test
void concurrentKafkaListenerContainerFactoryInBatchModeWhenBatchErrorHandlerNotAvailableShouldBeNull() {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
});
}
@Test
void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShouldBeNull() {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
.withBean("errorHandler", ErrorHandler.class, () -> mock(ErrorHandler.class)).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory).hasFieldOrPropertyWithValue("errorHandler", null);
});
}
@Test
void testConcurrentKafkaListenerContainerFactoryWithCustomCommonErrorHandler() {
this.contextRunner.withBean("errorHandler", CommonErrorHandler.class, () -> mock(CommonErrorHandler.class))
@ -710,7 +668,7 @@ class KafkaAutoConfigurationTests {
@Bean
RecordInterceptor<Object, Object> recordInterceptor() {
return (record) -> record;
return (record, consumer) -> record;
}
}

View File

@ -719,7 +719,7 @@ bom {
]
}
}
library("Kafka", "3.0.0") {
library("Kafka", "3.1.0") {
group("org.apache.kafka") {
modules = [
"connect",
@ -1355,7 +1355,7 @@ bom {
]
}
}
library("Spring Kafka", "3.0.0-M1") {
library("Spring Kafka", "3.0.0-SNAPSHOT") {
group("org.springframework.kafka") {
modules = [
"spring-kafka",

View File

@ -42,7 +42,7 @@ The following component creates a listener endpoint on the `someTopic` topic:
include::code:MyBean[]
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the container factory.
Similarly, if a `RecordFilterStrategy`, `ErrorHandler`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Similarly, if a `RecordFilterStrategy`, `CommonErrorHandler`, `AfterRollbackProcessor` or `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the default factory.
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean is associated to the default factory.
If only a `RecordMessageConverter` bean is present for a batch listener, it is wrapped in a `BatchMessageConverter`.