Add observationEnabled properties for Apache Kafka

See gh-38057
This commit is contained in:
Zhiyang.Wang1 2023-10-26 20:18:46 +08:00 committed by Scott Frederick
parent 9ba46f538b
commit ff9d9de1ee
5 changed files with 40 additions and 1 deletions

View File

@ -236,6 +236,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(properties::isImmediateStop).to(container::setStopImmediate);
map.from(properties::getObservationEnabled).to(container::setObservationEnabled);
map.from(this.transactionManager).to(container::setTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);

View File

@ -98,6 +98,7 @@ public class KafkaAutoConfiguration {
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);
map.from(this.properties.getTemplate().getObservationEnabled()).to(kafkaTemplate::setObservationEnabled);
return kafkaTemplate;
}

View File

@ -984,6 +984,11 @@ public class KafkaProperties {
*/
private String transactionIdPrefix;
/**
* Whether to enable observation.
*/
private boolean observationEnabled;
public String getDefaultTopic() {
return this.defaultTopic;
}
@ -1000,6 +1005,14 @@ public class KafkaProperties {
this.transactionIdPrefix = transactionIdPrefix;
}
public boolean getObservationEnabled() {
return this.observationEnabled;
}
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}
}
public static class Listener {
@ -1117,6 +1130,11 @@ public class KafkaProperties {
*/
private Boolean changeConsumerThreadName;
/**
* Whether to enable observation.
*/
private boolean observationEnabled;
public Type getType() {
return this.type;
}
@ -1261,6 +1279,14 @@ public class KafkaProperties {
this.changeConsumerThreadName = changeConsumerThreadName;
}
public boolean getObservationEnabled() {
return this.observationEnabled;
}
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}
}
public static class Ssl {

View File

@ -1811,6 +1811,10 @@
"name": "spring.kafka.jaas.control-flag",
"defaultValue": "required"
},
{
"name": "spring.kafka.listener.observation-enabled",
"defaultValue": false
},
{
"name": "spring.kafka.listener.only-log-record-metadata",
"type": "java.lang.Boolean",
@ -1905,6 +1909,10 @@
"level": "error"
}
},
{
"name": "spring.kafka.template.observation-enabled",
"defaultValue": false
},
{
"name": "spring.liquibase.check-change-log-location",
"type": "java.lang.Boolean",

View File

@ -618,7 +618,8 @@ class KafkaAutoConfigurationTests {
"spring.kafka.listener.missing-topics-fatal=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.listener.immediate-stop=true", "spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true")
"spring.kafka.jaas.options.useKeyTab=true", "spring.kafka.listener.async-acks=true",
"spring.kafka.template.observation-enabled=true", "spring.kafka.listener.observation-enabled=true")
.run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context.getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context.getBean(DefaultKafkaConsumerFactory.class);
@ -629,6 +630,7 @@ class KafkaAutoConfigurationTests {
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("producerFactory", producerFactory);
assertThat(kafkaTemplate.getDefaultTopic()).isEqualTo("testTopic");
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("transactionIdPrefix", "txOverride");
assertThat(kafkaTemplate).hasFieldOrPropertyWithValue("observationEnabled", true);
assertThat(kafkaListenerContainerFactory.getConsumerFactory()).isEqualTo(consumerFactory);
ContainerProperties containerProperties = kafkaListenerContainerFactory.getContainerProperties();
assertThat(containerProperties.getAckMode()).isEqualTo(AckMode.MANUAL);
@ -645,6 +647,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.isLogContainerConfig()).isTrue();
assertThat(containerProperties.isMissingTopicsFatal()).isTrue();
assertThat(containerProperties.isStopImmediate()).isTrue();
assertThat(containerProperties.isObservationEnabled()).isTrue();
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", true);