Polish 'Add Pulsar ConnectionDetails support'

See gh-37197
This commit is contained in:
Phillip Webb 2023-09-06 11:57:03 -07:00
parent 089fef0392
commit 750c597225
13 changed files with 74 additions and 98 deletions

View File

@ -277,5 +277,4 @@ tasks.named("checkSpringConfigurationMetadata").configure {
test {
jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED"
jvmArgs += "--add-opens=java.base/sun.net=ALL-UNNAMED"
}

View File

@ -30,12 +30,12 @@ class PropertiesPulsarConnectionDetails implements PulsarConnectionDetails {
}
@Override
public String getPulsarBrokerUrl() {
public String getBrokerUrl() {
return this.pulsarProperties.getClient().getServiceUrl();
}
@Override
public String getPulsarAdminUrl() {
public String getAdminUrl() {
return this.pulsarProperties.getAdmin().getServiceUrl();
}

View File

@ -82,20 +82,13 @@ class PulsarConfiguration {
DefaultPulsarClientFactory pulsarClientFactory(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarClientBuilderCustomizer> customizersProvider) {
List<PulsarClientBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeClientBuilder);
allCustomizers.add((clientBuilder) -> this.applyConnectionDetails(connectionDetails, clientBuilder));
allCustomizers.add((builder) -> this.propertiesMapper.customizeClientBuilder(builder, connectionDetails));
allCustomizers.addAll(customizersProvider.orderedStream().toList());
DefaultPulsarClientFactory clientFactory = new DefaultPulsarClientFactory(
(clientBuilder) -> applyClientBuilderCustomizers(allCustomizers, clientBuilder));
return clientFactory;
}
private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, ClientBuilder clientBuilder) {
if (connectionDetails.getPulsarBrokerUrl() != null) {
clientBuilder.serviceUrl(connectionDetails.getPulsarBrokerUrl());
}
}
private void applyClientBuilderCustomizers(List<PulsarClientBuilderCustomizer> customizers,
ClientBuilder clientBuilder) {
customizers.forEach((customizer) -> customizer.customize(clientBuilder));
@ -112,18 +105,11 @@ class PulsarConfiguration {
PulsarAdministration pulsarAdministration(PulsarConnectionDetails connectionDetails,
ObjectProvider<PulsarAdminBuilderCustomizer> pulsarAdminBuilderCustomizers) {
List<PulsarAdminBuilderCustomizer> allCustomizers = new ArrayList<>();
allCustomizers.add(this.propertiesMapper::customizeAdminBuilder);
allCustomizers.add((adminBuilder) -> this.applyConnectionDetails(connectionDetails, adminBuilder));
allCustomizers.add((builder) -> this.propertiesMapper.customizeAdminBuilder(builder, connectionDetails));
allCustomizers.addAll(pulsarAdminBuilderCustomizers.orderedStream().toList());
return new PulsarAdministration((adminBuilder) -> applyAdminBuilderCustomizers(allCustomizers, adminBuilder));
}
private void applyConnectionDetails(PulsarConnectionDetails connectionDetails, PulsarAdminBuilder adminBuilder) {
if (connectionDetails.getPulsarAdminUrl() != null) {
adminBuilder.serviceHttpUrl(connectionDetails.getPulsarAdminUrl());
}
}
private void applyAdminBuilderCustomizers(List<PulsarAdminBuilderCustomizer> customizers,
PulsarAdminBuilder adminBuilder) {
customizers.forEach((customizer) -> customizer.customize(adminBuilder));

View File

@ -27,15 +27,15 @@ import org.springframework.boot.autoconfigure.service.connection.ConnectionDetai
public interface PulsarConnectionDetails extends ConnectionDetails {
/**
* Returns the Pulsar service URL for the broker.
* @return the Pulsar service URL for the broker
* URL used to connect to the broker.
* @return the service URL
*/
String getPulsarBrokerUrl();
String getBrokerUrl();
/**
* Returns the Pulsar web URL for the admin endpoint.
* @return the Pulsar web URL for the admin endpoint
* URL user to connect to the admin endpoint.
* @return the admin URL
*/
String getPulsarAdminUrl();
String getAdminUrl();
}

View File

@ -49,21 +49,20 @@ final class PulsarPropertiesMapper {
this.properties = properties;
}
void customizeClientBuilder(ClientBuilder clientBuilder) {
void customizeClientBuilder(ClientBuilder clientBuilder, PulsarConnectionDetails connectionDetails) {
PulsarProperties.Client properties = this.properties.getClient();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getServiceUrl).to(clientBuilder::serviceUrl);
map.from(connectionDetails::getBrokerUrl).to(clientBuilder::serviceUrl);
map.from(properties::getConnectionTimeout).to(timeoutProperty(clientBuilder::connectionTimeout));
map.from(properties::getOperationTimeout).to(timeoutProperty(clientBuilder::operationTimeout));
map.from(properties::getLookupTimeout).to(timeoutProperty(clientBuilder::lookupTimeout));
customizeAuthentication(clientBuilder::authentication, properties.getAuthentication());
}
void customizeAdminBuilder(PulsarAdminBuilder adminBuilder) {
void customizeAdminBuilder(PulsarAdminBuilder adminBuilder, PulsarConnectionDetails connectionDetails) {
PulsarProperties.Admin properties = this.properties.getAdmin();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties::getServiceUrl).to(adminBuilder::serviceHttpUrl);
map.from(connectionDetails::getAdminUrl).to(adminBuilder::serviceHttpUrl);
map.from(properties::getConnectionTimeout).to(timeoutProperty(adminBuilder::connectionTimeout));
map.from(properties::getReadTimeout).to(timeoutProperty(adminBuilder::readTimeout));
map.from(properties::getRequestTimeout).to(timeoutProperty(adminBuilder::requestTimeout));

View File

@ -28,19 +28,19 @@ import static org.assertj.core.api.Assertions.assertThat;
class PropertiesPulsarConnectionDetailsTests {
@Test
void pulsarBrokerUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getClient().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarBrokerUrl()).isEqualTo("foo");
void getClientServiceUrlReturnsValueFromProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getClient().setServiceUrl("foo");
PulsarConnectionDetails connectionDetails = new PropertiesPulsarConnectionDetails(properties);
assertThat(connectionDetails.getBrokerUrl()).isEqualTo("foo");
}
@Test
void pulsarAdminUrlIsObtainedFromPulsarProperties() {
var pulsarProps = new PulsarProperties();
pulsarProps.getAdmin().setServiceUrl("foo");
var connectionDetails = new PropertiesPulsarConnectionDetails(pulsarProps);
assertThat(connectionDetails.getPulsarAdminUrl()).isEqualTo("foo");
void getAdminServiceHttpUrlReturnsValueFromProperties() {
PulsarProperties properties = new PulsarProperties();
properties.getAdmin().setServiceUrl("foo");
PulsarConnectionDetails connectionDetails = new PropertiesPulsarConnectionDetails(properties);
assertThat(connectionDetails.getAdminUrl()).isEqualTo("foo");
}
}

View File

@ -96,36 +96,20 @@ class PulsarConfigurationTests {
.run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient));
}
@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer"))
.callsInOrder(ClientBuilder::serviceUrl, "fromPropsCustomizer");
});
}
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarBrokerUrl()).willReturn("fromConnectionDetailsCustomizer");
given(connectionDetails.getBrokerUrl()).willReturn("connectiondetails");
PulsarConfigurationTests.this.contextRunner
.withUserConfiguration(PulsarClientBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.client.service-url=fromPropsCustomizer")
.withPropertyValues("spring.pulsar.client.service-url=properties")
.run((context) -> {
DefaultPulsarClientFactory clientFactory = context.getBean(DefaultPulsarClientFactory.class);
Customizers<PulsarClientBuilderCustomizer, ClientBuilder> customizers = Customizers
.of(ClientBuilder.class, PulsarClientBuilderCustomizer::customize);
assertThat(customizers.fromField(clientFactory, "customizer")).callsInOrder(
ClientBuilder::serviceUrl, "fromPropsCustomizer", "fromConnectionDetailsCustomizer",
"fromCustomizer1", "fromCustomizer2");
ClientBuilder::serviceUrl, "connectiondetails", "fromCustomizer1", "fromCustomizer2");
});
}
@ -162,35 +146,20 @@ class PulsarConfigurationTests {
.isSameAs(pulsarAdministration));
}
@Test
void whenConnectionDetailsAreNullTheyAreNotApplied() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn(null);
PulsarConfigurationTests.this.contextRunner.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers"))
.callsInOrder(PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer");
});
}
@Test
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getPulsarAdminUrl()).willReturn("fromConnectionDetailsCustomizer");
given(connectionDetails.getAdminUrl()).willReturn("connectiondetails");
this.contextRunner.withUserConfiguration(PulsarAdminBuilderCustomizersConfig.class)
.withBean(PulsarConnectionDetails.class, () -> connectionDetails)
.withPropertyValues("spring.pulsar.admin.service-url=fromPropsCustomizer")
.withPropertyValues("spring.pulsar.admin.service-url=property")
.run((context) -> {
PulsarAdministration pulsarAdmin = context.getBean(PulsarAdministration.class);
Customizers<PulsarAdminBuilderCustomizer, PulsarAdminBuilder> customizers = Customizers
.of(PulsarAdminBuilder.class, PulsarAdminBuilderCustomizer::customize);
assertThat(customizers.fromField(pulsarAdmin, "adminCustomizers")).callsInOrder(
PulsarAdminBuilder::serviceHttpUrl, "fromPropsCustomizer",
"fromConnectionDetailsCustomizer", "fromCustomizer1", "fromCustomizer2");
PulsarAdminBuilder::serviceHttpUrl, "connectiondetails", "fromCustomizer1",
"fromCustomizer2");
});
}

View File

@ -41,6 +41,7 @@ import org.springframework.boot.autoconfigure.pulsar.PulsarProperties.Consumer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
@ -60,7 +61,8 @@ class PulsarPropertiesMapperTests {
properties.getClient().setOperationTimeout(Duration.ofSeconds(2));
properties.getClient().setLookupTimeout(Duration.ofSeconds(3));
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().serviceUrl("https://example.com");
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
then(builder).should().operationTimeout(2000, TimeUnit.MILLISECONDS);
@ -74,10 +76,22 @@ class PulsarPropertiesMapperTests {
properties.getClient().getAuthentication().setPluginClassName("myclass");
properties.getClient().getAuthentication().setParam(params);
ClientBuilder builder = mock(ClientBuilder.class);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder);
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().authentication("myclass", params);
}
@Test
void customizeClientBuilderWhenHasConnectionDetails() {
PulsarProperties properties = new PulsarProperties();
properties.getClient().setServiceUrl("https://ignored.example.com");
ClientBuilder builder = mock(ClientBuilder.class);
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getBrokerUrl()).willReturn("https://used.example.com");
new PulsarPropertiesMapper(properties).customizeClientBuilder(builder, connectionDetails);
then(builder).should().serviceUrl("https://used.example.com");
}
@Test
void customizeAdminBuilderWhenHasNoAuthentication() {
PulsarProperties properties = new PulsarProperties();
@ -86,7 +100,8 @@ class PulsarPropertiesMapperTests {
properties.getAdmin().setReadTimeout(Duration.ofSeconds(2));
properties.getAdmin().setRequestTimeout(Duration.ofSeconds(3));
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder);
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().serviceHttpUrl("https://example.com");
then(builder).should().connectionTimeout(1000, TimeUnit.MILLISECONDS);
then(builder).should().readTimeout(2000, TimeUnit.MILLISECONDS);
@ -100,10 +115,22 @@ class PulsarPropertiesMapperTests {
properties.getAdmin().getAuthentication().setPluginClassName("myclass");
properties.getAdmin().getAuthentication().setParam(params);
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder);
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder,
new PropertiesPulsarConnectionDetails(properties));
then(builder).should().authentication("myclass", params);
}
@Test
void customizeAdminBuilderWhenHasConnectionDetails() {
PulsarProperties properties = new PulsarProperties();
properties.getAdmin().setServiceUrl("https://ignored.example.com");
PulsarAdminBuilder builder = mock(PulsarAdminBuilder.class);
PulsarConnectionDetails connectionDetails = mock(PulsarConnectionDetails.class);
given(connectionDetails.getAdminUrl()).willReturn("https://used.example.com");
new PulsarPropertiesMapper(properties).customizeAdminBuilder(builder, connectionDetails);
then(builder).should().serviceHttpUrl("https://used.example.com");
}
@Test
@SuppressWarnings("unchecked")
void customizeProducerBuilder() {

View File

@ -17,6 +17,7 @@
package org.springframework.boot.docker.compose.service.connection.pulsar;
import org.springframework.boot.autoconfigure.pulsar.PulsarConnectionDetails;
import org.springframework.boot.docker.compose.core.ConnectionPorts;
import org.springframework.boot.docker.compose.core.RunningService;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource;
@ -30,9 +31,9 @@ import org.springframework.boot.docker.compose.service.connection.DockerComposeC
class PulsarDockerComposeConnectionDetailsFactory
extends DockerComposeConnectionDetailsFactory<PulsarConnectionDetails> {
private static final int PULSAR_BROKER_PORT = 6650;
private static final int BROKER_PORT = 6650;
private static final int PULSAR_ADMIN_PORT = 8080;
private static final int ADMIN_PORT = 8080;
PulsarDockerComposeConnectionDetailsFactory() {
super("apachepulsar/pulsar");
@ -55,17 +56,18 @@ class PulsarDockerComposeConnectionDetailsFactory
PulsarDockerComposeConnectionDetails(RunningService service) {
super(service);
this.brokerUrl = "pulsar://%s:%s".formatted(service.host(), service.ports().get(PULSAR_BROKER_PORT));
this.adminUrl = "http://%s:%s".formatted(service.host(), service.ports().get(PULSAR_ADMIN_PORT));
ConnectionPorts ports = service.ports();
this.brokerUrl = "pulsar://%s:%s".formatted(service.host(), ports.get(BROKER_PORT));
this.adminUrl = "http://%s:%s".formatted(service.host(), ports.get(ADMIN_PORT));
}
@Override
public String getPulsarBrokerUrl() {
public String getBrokerUrl() {
return this.brokerUrl;
}
@Override
public String getPulsarAdminUrl() {
public String getAdminUrl() {
return this.adminUrl;
}

View File

@ -39,8 +39,8 @@ class PulsarDockerComposeConnectionDetailsFactoryIntegrationTests extends Abstra
void runCreatesConnectionDetails() {
PulsarConnectionDetails connectionDetails = run(PulsarConnectionDetails.class);
assertThat(connectionDetails).isNotNull();
assertThat(connectionDetails.getPulsarBrokerUrl()).matches("^pulsar:\\/\\/\\S+:\\d+");
assertThat(connectionDetails.getPulsarAdminUrl()).matches("^http:\\/\\/\\S+:\\d+");
assertThat(connectionDetails.getBrokerUrl()).matches("^pulsar:\\/\\/\\S+:\\d+");
assertThat(connectionDetails.getAdminUrl()).matches("^http:\\/\\/\\S+:\\d+");
}
}

View File

@ -56,8 +56,3 @@ dependencies {
testRuntimeOnly("com.oracle.database.r2dbc:oracle-r2dbc")
}
test {
jvmArgs += "--add-opens=java.base/java.net=ALL-UNNAMED"
jvmArgs += "--add-opens=java.base/sun.net=ALL-UNNAMED"
}

View File

@ -48,12 +48,12 @@ class PulsarContainerConnectionDetailsFactory
}
@Override
public String getPulsarBrokerUrl() {
public String getBrokerUrl() {
return getContainer().getPulsarBrokerUrl();
}
@Override
public String getPulsarAdminUrl() {
public String getAdminUrl() {
return getContainer().getHttpServiceUrl();
}

View File

@ -44,7 +44,6 @@ class SamplePulsarApplicationTests {
@Container
@ServiceConnection
@SuppressWarnings("unused")
static final PulsarContainer container = new PulsarContainer(DockerImageNames.pulsar()).withStartupAttempts(2)
.withStartupTimeout(Duration.ofMinutes(3));