This commit is contained in:
Phillip Webb 2024-01-11 21:40:18 -08:00
parent 25caded250
commit 653474fc46
4 changed files with 31 additions and 25 deletions

View File

@ -233,7 +233,7 @@ class MicrometerTracingAutoConfigurationTests {
@Bean @Bean
SpanTagAnnotationHandler spanTagAnnotationHandler() { SpanTagAnnotationHandler spanTagAnnotationHandler() {
return new SpanTagAnnotationHandler((aClass) -> null, (aClass) -> null); return new SpanTagAnnotationHandler((valueResolverClass) -> null, (valueExpressionResolverClass) -> null);
} }
} }

View File

@ -115,13 +115,13 @@ public final class DefaultJmsListenerContainerFactoryConfigurer {
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) { public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
Assert.notNull(factory, "Factory must not be null"); Assert.notNull(factory, "Factory must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(this.jmsProperties.isPubSubDomain());
factory.setSubscriptionDurable(this.jmsProperties.isSubscriptionDurable());
factory.setClientId(this.jmsProperties.getClientId());
JmsProperties.Listener listenerProperties = this.jmsProperties.getListener(); JmsProperties.Listener listenerProperties = this.jmsProperties.getListener();
Session sessionProperties = listenerProperties.getSession(); Session sessionProperties = listenerProperties.getSession();
factory.setConnectionFactory(connectionFactory);
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this.jmsProperties::isPubSubDomain).to(factory::setPubSubDomain);
map.from(this.jmsProperties::isSubscriptionDurable).to(factory::setSubscriptionDurable);
map.from(this.jmsProperties::getClientId).to(factory::setClientId);
map.from(this.transactionManager).to(factory::setTransactionManager); map.from(this.transactionManager).to(factory::setTransactionManager);
map.from(this.destinationResolver).to(factory::setDestinationResolver); map.from(this.destinationResolver).to(factory::setDestinationResolver);
map.from(this.messageConverter).to(factory::setMessageConverter); map.from(this.messageConverter).to(factory::setMessageConverter);

View File

@ -64,7 +64,7 @@ final class PulsarPropertiesMapper {
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout)); map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout)); map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout)); map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
customizeAuthentication(clientBuilder::authentication, properties.getAuthentication()); customizeAuthentication(properties.getAuthentication(), clientBuilder::authentication);
customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties, customizeServiceUrlProviderBuilder(clientBuilder::serviceUrl, clientBuilder::serviceUrlProvider, properties,
connectionDetails); connectionDetails);
} }
@ -77,21 +77,11 @@ final class PulsarPropertiesMapper {
serviceUrlConsumer.accept(connectionDetails.getBrokerUrl()); serviceUrlConsumer.accept(connectionDetails.getBrokerUrl());
return; return;
} }
Map<String, Authentication> secondaryAuths = new LinkedHashMap<>(); Map<String, Authentication> secondaryAuths = getSecondaryAuths(failoverProperties);
failoverProperties.getBackupClusters().forEach((cluster) -> {
PulsarProperties.Authentication authentication = cluster.getAuthentication();
if (authentication.getPluginClassName() == null) {
secondaryAuths.put(cluster.getServiceUrl(), null);
}
else {
customizeAuthentication((authPluginClassName, authParams) -> secondaryAuths.put(cluster.getServiceUrl(),
AuthenticationFactory.create(authPluginClassName, authParams)), authentication);
}
});
AutoClusterFailoverBuilder autoClusterFailoverBuilder = new AutoClusterFailoverBuilderImpl(); AutoClusterFailoverBuilder autoClusterFailoverBuilder = new AutoClusterFailoverBuilderImpl();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(connectionDetails::getBrokerUrl).to(autoClusterFailoverBuilder::primary); map.from(connectionDetails::getBrokerUrl).to(autoClusterFailoverBuilder::primary);
map.from(new ArrayList<>(secondaryAuths.keySet())).to(autoClusterFailoverBuilder::secondary); map.from(secondaryAuths::keySet).as(ArrayList::new).to(autoClusterFailoverBuilder::secondary);
map.from(failoverProperties::getFailoverPolicy).to(autoClusterFailoverBuilder::failoverPolicy); map.from(failoverProperties::getFailoverPolicy).to(autoClusterFailoverBuilder::failoverPolicy);
map.from(failoverProperties::getFailOverDelay).to(timeoutProperty(autoClusterFailoverBuilder::failoverDelay)); map.from(failoverProperties::getFailOverDelay).to(timeoutProperty(autoClusterFailoverBuilder::failoverDelay));
map.from(failoverProperties::getSwitchBackDelay) map.from(failoverProperties::getSwitchBackDelay)
@ -101,6 +91,23 @@ final class PulsarPropertiesMapper {
serviceUrlProviderConsumer.accept(autoClusterFailoverBuilder.build()); serviceUrlProviderConsumer.accept(autoClusterFailoverBuilder.build());
} }
private Map<String, Authentication> getSecondaryAuths(PulsarProperties.Failover properties) {
Map<String, Authentication> secondaryAuths = new LinkedHashMap<>();
properties.getBackupClusters().forEach((backupCluster) -> {
PulsarProperties.Authentication authenticationProperties = backupCluster.getAuthentication();
if (authenticationProperties.getPluginClassName() == null) {
secondaryAuths.put(backupCluster.getServiceUrl(), null);
}
else {
customizeAuthentication(authenticationProperties, (authPluginClassName, authParams) -> {
Authentication authentication = AuthenticationFactory.create(authPluginClassName, authParams);
secondaryAuths.put(backupCluster.getServiceUrl(), authentication);
});
}
});
return secondaryAuths;
}
void customizeAdminBuilder(PulsarAdminBuilder adminBuilder, PulsarConnectionDetails connectionDetails) { void customizeAdminBuilder(PulsarAdminBuilder adminBuilder, PulsarConnectionDetails connectionDetails) {
PulsarProperties.Admin properties = this.properties.getAdmin(); PulsarProperties.Admin properties = this.properties.getAdmin();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
@ -108,15 +115,14 @@ final class PulsarPropertiesMapper {
map.from(properties::getConnectionTimeout).to(timeoutProperty(adminBuilder::connectionTimeout)); map.from(properties::getConnectionTimeout).to(timeoutProperty(adminBuilder::connectionTimeout));
map.from(properties::getReadTimeout).to(timeoutProperty(adminBuilder::readTimeout)); map.from(properties::getReadTimeout).to(timeoutProperty(adminBuilder::readTimeout));
map.from(properties::getRequestTimeout).to(timeoutProperty(adminBuilder::requestTimeout)); map.from(properties::getRequestTimeout).to(timeoutProperty(adminBuilder::requestTimeout));
customizeAuthentication(adminBuilder::authentication, properties.getAuthentication()); customizeAuthentication(properties.getAuthentication(), adminBuilder::authentication);
} }
private void customizeAuthentication(AuthenticationConsumer authentication, private void customizeAuthentication(PulsarProperties.Authentication properties, AuthenticationConsumer action) {
PulsarProperties.Authentication properties) { String pluginClassName = properties.getPluginClassName();
if (StringUtils.hasText(properties.getPluginClassName())) { if (StringUtils.hasText(pluginClassName)) {
try { try {
authentication.accept(properties.getPluginClassName(), action.accept(pluginClassName, getAuthenticationParamsJson(properties.getParam()));
getAuthenticationParamsJson(properties.getParam()));
} }
catch (UnsupportedAuthenticationException ex) { catch (UnsupportedAuthenticationException ex) {
throw new IllegalStateException("Unable to configure Pulsar authentication", ex); throw new IllegalStateException("Unable to configure Pulsar authentication", ex);

View File

@ -332,7 +332,7 @@ public class ServerProperties {
/** /**
* The maximum number of sessions that can be stored. * The maximum number of sessions that can be stored.
*/ */
private int maxSessions = 10_000; private int maxSessions = 10000;
@NestedConfigurationProperty @NestedConfigurationProperty
private final Cookie cookie = new Cookie(); private final Cookie cookie = new Cookie();