Merge pull request #20917 from dhirenmathur

* pr/20917:
  Polish "Align Kafka's missingTopicsFatal default value"
  Align Kafka's missingTopicsFatal default value

Closes gh-20917
This commit is contained in:
Stephane Nicoll 2020-04-11 10:21:31 +02:00
commit 080c20b628
3 changed files with 45 additions and 16 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -876,7 +876,7 @@ public class KafkaProperties {
* Whether the container should fail to start if at least one of the configured
* topics are not present on the broker.
*/
private boolean missingTopicsFatal = true;
private boolean missingTopicsFatal = false;
public Type getType() {
return this.type;

View File

@ -38,7 +38,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -364,7 +363,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.no-poll-threshold=2.5", "spring.kafka.listener.type=batch",
"spring.kafka.listener.idle-event-interval=1s", "spring.kafka.listener.monitor-interval=45",
"spring.kafka.listener.log-container-config=true",
"spring.kafka.listener.missing-topics-fatal=false", "spring.kafka.jaas.enabled=true",
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.options.useKeyTab=true")
.run((context) -> {
@ -389,7 +388,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.getIdleEventInterval()).isEqualTo(1000L);
assertThat(containerProperties.getMonitorInterval()).isEqualTo(45);
assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isFalse();
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
assertThat(ReflectionTestUtils.getField(kafkaListenerContainerFactory, "concurrency")).isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
@ -403,17 +402,6 @@ class KafkaAutoConfigurationTests {
});
}
@Test
void listenerPropertiesMatchDefaults() {
this.contextRunner.run((context) -> {
Listener listenerProperties = new KafkaProperties().getListener();
AbstractKafkaListenerContainerFactory<?, ?, ?> kafkaListenerContainerFactory = (AbstractKafkaListenerContainerFactory<?, ?, ?>) context
.getBean(KafkaListenerContainerFactory.class);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.isMissingTopicsFatal()).isEqualTo(listenerProperties.isMissingTopicsFatal());
});
}
@Test
void testKafkaTemplateRecordMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)

View File

@ -0,0 +1,41 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for {@link KafkaProducer}.
*
* @author Stephane Nicoll
*/
class KafkaPropertiesTests {
@Test
void listenerDefaultValuesAreConsistent() {
ContainerProperties container = new ContainerProperties("test");
Listener listenerProperties = new KafkaProperties().getListener();
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
}
}