From ff9d9de1eeae66dbad0b163bbd47e8ea1c1440f5 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Thu, 26 Oct 2023 20:18:46 +0800 Subject: [PATCH] Add observationEnabled properties for Apache Kafka See gh-38057 --- ...fkaListenerContainerFactoryConfigurer.java | 1 + .../kafka/KafkaAutoConfiguration.java | 1 + .../autoconfigure/kafka/KafkaProperties.java | 26 +++++++++++++++++++ ...itional-spring-configuration-metadata.json | 8 ++++++ .../kafka/KafkaAutoConfigurationTests.java | 5 +++- 5 files changed, 40 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index 1a2102ad8a2..95108c275e0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -236,6 +236,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(properties::isImmediateStop).to(container::setStopImmediate); + map.from(properties::getObservationEnabled).to(container::setObservationEnabled); map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener); map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 9e2b2f22c8c..b9d2edc045f 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -98,6 +98,7 @@ public class KafkaAutoConfiguration { map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); + map.from(this.properties.getTemplate().getObservationEnabled()).to(kafkaTemplate::setObservationEnabled); return kafkaTemplate; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index a7da43eea30..f1ad2df87e0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -984,6 +984,11 @@ public class KafkaProperties { */ private String transactionIdPrefix; + /** + * Whether to enable observation. + */ + private boolean observationEnabled; + public String getDefaultTopic() { return this.defaultTopic; } @@ -1000,6 +1005,14 @@ public class KafkaProperties { this.transactionIdPrefix = transactionIdPrefix; } + public boolean getObservationEnabled() { + return this.observationEnabled; + } + + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + } public static class Listener { @@ -1117,6 +1130,11 @@ public class KafkaProperties { */ private Boolean changeConsumerThreadName; + /** + * Whether to enable observation. + */ + private boolean observationEnabled; + public Type getType() { return this.type; } @@ -1261,6 +1279,14 @@ public class KafkaProperties { this.changeConsumerThreadName = changeConsumerThreadName; } + public boolean getObservationEnabled() { + return this.observationEnabled; + } + + public void setObservationEnabled(boolean observationEnabled) { + this.observationEnabled = observationEnabled; + } + } public static class Ssl { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 7d21dfd05e2..70140406295 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -1811,6 +1811,10 @@ "name": "spring.kafka.jaas.control-flag", "defaultValue": "required" }, + { + "name": "spring.kafka.listener.observation-enabled", + "defaultValue": false + }, { "name": "spring.kafka.listener.only-log-record-metadata", "type": "java.lang.Boolean", @@ -1905,6 +1909,10 @@ "level": "error" } }, + { + "name": "spring.kafka.template.observation-enabled", + "defaultValue": false + }, { "name": "spring.liquibase.check-change-log-location", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 2b8c48d3e02..4adc774e8d7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -618,7 +618,8 @@ class KafkaAutoConfigurationTests { "spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true", "spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo", "spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE", - "spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true") + "spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true", + "spring.kafka.template.observation-enabled=true", "spring.kafka.listener.observation-enabled=true") .run((context) -> { DefaultKafkaProducerFactory producerFactory = context.getBean(DefaultKafkaProducerFactory.class); DefaultKafkaConsumerFactory consumerFactory = context.getBean(DefaultKafkaConsumerFactory.class); @@ -629,6 +630,7 @@ class KafkaAutoConfigurationTests { assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory); assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic"); assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride"); + assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("observationEnabled", true); assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory); ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties(); assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL); @@ -645,6 +647,7 @@ class KafkaAutoConfigurationTests { assertThat(containerProperties.isLogContainerConfig()).isTrue(); assertThat(containerProperties.isMissingTopicsFatal()).isTrue(); assertThat(containerProperties.isStopImmediate()).isTrue(); + assertThat(containerProperties.isObservationEnabled()).isTrue(); assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3); assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue(); assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", true);