Merge branch '2.2.x'

Closes gh-21208
This commit is contained in:
Brian Clozel 2020-04-28 11:33:50 +02:00
commit dac62476a0
11 changed files with 150 additions and 49 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package org.springframework.boot.autoconfigure.rsocket;
import java.util.stream.Collectors;
import io.rsocket.RSocketFactory;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.netty.http.server.HttpServer;
@ -36,6 +36,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.netty.NettyRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.context.annotation.Bean;
@ -57,7 +58,7 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHa
* @since 2.2.0
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ RSocketFactory.class, RSocketStrategies.class, HttpServer.class, TcpServerTransport.class })
@ConditionalOnClass({ RSocketServer.class, RSocketStrategies.class, HttpServer.class, TcpServerTransport.class })
@ConditionalOnBean(RSocketMessageHandler.class)
@AutoConfigureAfter(RSocketStrategiesAutoConfiguration.class)
@EnableConfigurationProperties(RSocketProperties.class)
@ -69,10 +70,12 @@ public class RSocketServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("deprecation")
RSocketWebSocketNettyRouteProvider rSocketWebsocketRouteProvider(RSocketProperties properties,
RSocketMessageHandler messageHandler, ObjectProvider<ServerRSocketFactoryProcessor> processors) {
RSocketMessageHandler messageHandler, ObjectProvider<ServerRSocketFactoryProcessor> processors,
ObjectProvider<RSocketServerCustomizer> customizers) {
return new RSocketWebSocketNettyRouteProvider(properties.getServer().getMappingPath(),
messageHandler.responder(), processors.orderedStream());
messageHandler.responder(), processors.orderedStream(), customizers.orderedStream());
}
}
@ -89,14 +92,17 @@ public class RSocketServerAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@SuppressWarnings("deprecation")
RSocketServerFactory rSocketServerFactory(RSocketProperties properties, ReactorResourceFactory resourceFactory,
ObjectProvider<ServerRSocketFactoryProcessor> processors) {
ObjectProvider<ServerRSocketFactoryProcessor> processors,
ObjectProvider<RSocketServerCustomizer> customizers) {
NettyRSocketServerFactory factory = new NettyRSocketServerFactory();
factory.setResourceFactory(resourceFactory);
factory.setTransport(properties.getServer().getTransport());
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(properties.getServer().getAddress()).to(factory::setAddress);
map.from(properties.getServer().getPort()).to(factory::setPort);
factory.setRSocketServerCustomizers(customizers.orderedStream().collect(Collectors.toList()));
factory.setSocketFactoryProcessors(processors.orderedStream().collect(Collectors.toList()));
return factory;
}
@ -109,13 +115,12 @@ public class RSocketServerAutoConfiguration {
}
@Bean
ServerRSocketFactoryProcessor frameDecoderServerFactoryCustomizer(RSocketMessageHandler rSocketMessageHandler) {
return (serverRSocketFactory) -> {
RSocketServerCustomizer frameDecoderRSocketServerCustomizer(RSocketMessageHandler rSocketMessageHandler) {
return (server) -> {
if (rSocketMessageHandler.getRSocketStrategies()
.dataBufferFactory() instanceof NettyDataBufferFactory) {
return serverRSocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
server.payloadDecoder(PayloadDecoder.ZERO_COPY);
}
return serverRSocketFactory;
};
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -22,10 +22,12 @@ import java.util.stream.Stream;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
import reactor.netty.http.server.HttpServerRoutes;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
@ -34,6 +36,7 @@ import org.springframework.boot.web.embedded.netty.NettyRouteProvider;
*
* @author Brian Clozel
*/
@SuppressWarnings("deprecation")
class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider {
private final String mappingPath;
@ -42,21 +45,24 @@ class RSocketWebSocketNettyRouteProvider implements NettyRouteProvider {
private final List<ServerRSocketFactoryProcessor> processors;
private final List<RSocketServerCustomizer> customizers;
RSocketWebSocketNettyRouteProvider(String mappingPath, SocketAcceptor socketAcceptor,
Stream<ServerRSocketFactoryProcessor> processors) {
Stream<ServerRSocketFactoryProcessor> processors, Stream<RSocketServerCustomizer> customizers) {
this.mappingPath = mappingPath;
this.socketAcceptor = socketAcceptor;
this.processors = processors.collect(Collectors.toList());
this.customizers = customizers.collect(Collectors.toList());
}
@Override
public HttpServerRoutes apply(HttpServerRoutes httpServerRoutes) {
RSocketFactory.ServerRSocketFactory server = RSocketFactory.receive();
for (ServerRSocketFactoryProcessor processor : this.processors) {
server = processor.process(server);
}
ServerTransport.ConnectionAcceptor acceptor = server.acceptor(this.socketAcceptor).toConnectionAcceptor();
return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(acceptor));
RSocketServer server = RSocketServer.create(this.socketAcceptor);
RSocketFactory.ServerRSocketFactory factory = new RSocketFactory.ServerRSocketFactory(server);
this.processors.forEach((processor) -> processor.process(factory));
this.customizers.forEach((customizer) -> customizer.customize(server));
ServerTransport.ConnectionAcceptor connectionAcceptor = server.asConnectionAcceptor();
return httpServerRoutes.ws(this.mappingPath, WebsocketRouteTransport.newHandler(connectionAcceptor));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,7 +18,7 @@ package org.springframework.boot.autoconfigure.security.rsocket;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.rsocket.EnableRSocketSecurity;
@ -29,6 +29,7 @@ import org.springframework.security.rsocket.core.SecuritySocketAcceptorIntercept
* server.
*
* @author Madhura Bhave
* @author Brian Clozel
* @since 2.2.0
*/
@Configuration(proxyBeanMethods = false)
@ -37,8 +38,8 @@ import org.springframework.security.rsocket.core.SecuritySocketAcceptorIntercept
public class RSocketSecurityAutoConfiguration {
@Bean
ServerRSocketFactoryProcessor springSecurityRSocketSecurity(SecuritySocketAcceptorInterceptor interceptor) {
return (factory) -> factory.addSocketAcceptorPlugin(interceptor);
RSocketServerCustomizer springSecurityRSocketSecurity(SecuritySocketAcceptorInterceptor interceptor) {
return (server) -> server.interceptors((registry) -> registry.forSocketAcceptor(interceptor));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,8 +21,8 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.rsocket.context.RSocketPortInfoApplicationContextInitializer;
import org.springframework.boot.rsocket.context.RSocketServerBootstrap;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
@ -78,8 +78,7 @@ class RSocketServerAutoConfigurationTests {
void shouldCreateDefaultBeansForRSocketServerWhenPortIsSet() {
reactiveWebContextRunner().withPropertyValues("spring.rsocket.server.port=0")
.run((context) -> assertThat(context).hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(RSocketServerBootstrap.class)
.hasSingleBean(ServerRSocketFactoryProcessor.class));
.hasSingleBean(RSocketServerBootstrap.class).hasSingleBean(RSocketServerCustomizer.class));
}
@Test
@ -87,8 +86,7 @@ class RSocketServerAutoConfigurationTests {
reactiveWebContextRunner().withPropertyValues("spring.rsocket.server.port=0")
.withInitializer(new RSocketPortInfoApplicationContextInitializer()).run((context) -> {
assertThat(context).hasSingleBean(RSocketServerFactory.class)
.hasSingleBean(RSocketServerBootstrap.class)
.hasSingleBean(ServerRSocketFactoryProcessor.class);
.hasSingleBean(RSocketServerBootstrap.class).hasSingleBean(RSocketServerCustomizer.class);
assertThat(context.getEnvironment().getProperty("local.rsocket.server.port")).isNotNull();
});
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,28 +16,26 @@
package org.springframework.boot.autoconfigure.security.rsocket;
import io.rsocket.RSocketFactory;
import io.rsocket.core.RSocketServer;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
import org.springframework.boot.autoconfigure.security.reactive.ReactiveUserDetailsServiceAutoConfiguration;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.security.config.annotation.rsocket.RSocketSecurity;
import org.springframework.security.rsocket.core.SecuritySocketAcceptorInterceptor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link RSocketSecurityAutoConfiguration}.
*
* @author Madhura Bhave
* @author Brian Clozel
*/
class RSocketSecurityAutoConfigurationTests {
@ -58,14 +56,15 @@ class RSocketSecurityAutoConfigurationTests {
@Test
void autoConfigurationAddsCustomizerForServerRSocketFactory() {
RSocketFactory.ServerRSocketFactory factory = mock(RSocketFactory.ServerRSocketFactory.class);
ArgumentCaptor<SecuritySocketAcceptorInterceptor> captor = ArgumentCaptor
.forClass(SecuritySocketAcceptorInterceptor.class);
RSocketServer server = RSocketServer.create();
this.contextRunner.run((context) -> {
ServerRSocketFactoryProcessor customizer = context.getBean(ServerRSocketFactoryProcessor.class);
customizer.process(factory);
verify(factory).addSocketAcceptorPlugin(captor.capture());
assertThat(captor.getValue()).isInstanceOf(SecuritySocketAcceptorInterceptor.class);
RSocketServerCustomizer customizer = context.getBean(RSocketServerCustomizer.class);
customizer.customize(server);
server.interceptors((registry) -> registry.forSocketAcceptor((interceptors) -> {
assertThat(interceptors).isNotEmpty();
assertThat(interceptors)
.anyMatch((interceptor) -> interceptor instanceof SecuritySocketAcceptorInterceptor);
}));
});
}

View File

@ -1403,7 +1403,7 @@ bom {
]
}
}
library("RSocket", "1.0.0-RC6") {
library("RSocket", "1.0.0-RC7") {
group("io.rsocket") {
imports = [
"rsocket-bom"

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,6 +37,7 @@ import reactor.netty.tcp.TcpServer;
import org.springframework.boot.rsocket.server.ConfigurableRSocketServerFactory;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.RSocketServerFactory;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.http.client.reactive.ReactorResourceFactory;
@ -63,6 +64,8 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
private List<ServerRSocketFactoryProcessor> socketFactoryProcessors = new ArrayList<>();
private List<RSocketServerCustomizer> rSocketServerCustomizers = new ArrayList<>();
@Override
public void setPort(int port) {
this.port = port;
@ -91,7 +94,10 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
* {@link ServerRSocketFactory} while building the server. Calling this method will
* replace any existing processors.
* @param socketFactoryProcessors processors to apply before the server starts
* @deprecated in favor of {@link #setRSocketServerCustomizers(Collection)} as of
* 2.2.7
*/
@Deprecated
public void setSocketFactoryProcessors(
Collection<? extends ServerRSocketFactoryProcessor> socketFactoryProcessors) {
Assert.notNull(socketFactoryProcessors, "SocketFactoryProcessors must not be null");
@ -102,12 +108,38 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
* Add {@link ServerRSocketFactoryProcessor}s that should be called to process the
* {@link ServerRSocketFactory} while building the server.
* @param socketFactoryProcessors processors to apply before the server starts
* @deprecated in favor of
* {@link #addRSocketServerCustomizers(RSocketServerCustomizer...)} as of 2.2.7
*/
@Deprecated
public void addSocketFactoryProcessors(ServerRSocketFactoryProcessor... socketFactoryProcessors) {
Assert.notNull(socketFactoryProcessors, "SocketFactoryProcessors must not be null");
this.socketFactoryProcessors.addAll(Arrays.asList(socketFactoryProcessors));
}
/**
* Set {@link RSocketServerCustomizer}s that should be called to configure the
* {@link io.rsocket.core.RSocketServer} while building the server. Calling this
* method will replace any existing customizers.
* @param rSocketServerCustomizers customizers to apply before the server starts
* @since 2.2.7
*/
public void setRSocketServerCustomizers(Collection<? extends RSocketServerCustomizer> rSocketServerCustomizers) {
Assert.notNull(rSocketServerCustomizers, "RSocketServerCustomizers must not be null");
this.rSocketServerCustomizers = new ArrayList<>(rSocketServerCustomizers);
}
/**
* Add {@link RSocketServerCustomizer}s that should be called to configure the
* {@link io.rsocket.core.RSocketServer}.
* @param rSocketServerCustomizers customizers to apply before the server starts
* @since 2.2.7
*/
public void addRSocketServerCustomizers(RSocketServerCustomizer... rSocketServerCustomizers) {
Assert.notNull(rSocketServerCustomizers, "RSocketServerCustomizers must not be null");
this.rSocketServerCustomizers.addAll(Arrays.asList(rSocketServerCustomizers));
}
/**
* Set the maximum amount of time that should be waited when starting or stopping the
* server.
@ -118,13 +150,14 @@ public class NettyRSocketServerFactory implements RSocketServerFactory, Configur
}
@Override
@SuppressWarnings("deprecation")
public NettyRSocketServer create(SocketAcceptor socketAcceptor) {
ServerTransport<CloseableChannel> transport = createTransport();
RSocketFactory.ServerRSocketFactory factory = RSocketFactory.receive();
for (ServerRSocketFactoryProcessor processor : this.socketFactoryProcessors) {
factory = processor.process(factory);
}
Mono<CloseableChannel> starter = factory.acceptor(socketAcceptor).transport(transport).start();
io.rsocket.core.RSocketServer server = io.rsocket.core.RSocketServer.create(socketAcceptor);
RSocketFactory.ServerRSocketFactory factory = new ServerRSocketFactory(server);
this.rSocketServerCustomizers.forEach((customizer) -> customizer.customize(server));
this.socketFactoryProcessors.forEach((processor) -> processor.process(factory));
Mono<CloseableChannel> starter = server.bind(transport);
return new NettyRSocketServer(starter, this.lifecycleTimeout);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.rsocket.server;
import io.rsocket.core.RSocketServer;
/**
* Callback interface that can be used to customize a {@link RSocketServer}.
*
* @author Brian Clozel
* @see RSocketServer
* @since 2.3.0
*/
@FunctionalInterface
public interface RSocketServerCustomizer {
/**
* Callback to customize a {@link RSocketServer} instance.
* @param rSocketServer the RSocket server to customize
*/
void customize(RSocketServer rSocketServer);
}

View File

@ -25,8 +25,10 @@ import io.rsocket.RSocketFactory.ServerRSocketFactory;
* @author Brian Clozel
* @see RSocketServerFactory
* @since 2.2.0
* @deprecated in favor of {@link RSocketServerCustomizer} as of 2.2.7
*/
@FunctionalInterface
@Deprecated
public interface ServerRSocketFactoryProcessor {
/**

View File

@ -37,6 +37,7 @@ import org.mockito.InOrder;
import reactor.core.publisher.Mono;
import org.springframework.boot.rsocket.server.RSocketServer;
import org.springframework.boot.rsocket.server.RSocketServerCustomizer;
import org.springframework.boot.rsocket.server.ServerRSocketFactoryProcessor;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
@ -49,6 +50,7 @@ import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.will;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -130,6 +132,7 @@ class NettyRSocketServerFactoryTests {
}
@Test
@SuppressWarnings("deprecation")
void serverProcessors() {
NettyRSocketServerFactory factory = getFactory();
ServerRSocketFactoryProcessor[] processors = new ServerRSocketFactoryProcessor[2];
@ -146,6 +149,23 @@ class NettyRSocketServerFactoryTests {
}
}
@Test
void serverCustomizers() {
NettyRSocketServerFactory factory = getFactory();
RSocketServerCustomizer[] customizers = new RSocketServerCustomizer[2];
for (int i = 0; i < customizers.length; i++) {
customizers[i] = mock(RSocketServerCustomizer.class);
will((invocation) -> invocation.getArgument(0)).given(customizers[i])
.customize(any(io.rsocket.core.RSocketServer.class));
}
factory.setRSocketServerCustomizers(Arrays.asList(customizers));
this.server = factory.create(new EchoRequestResponseAcceptor());
InOrder ordered = inOrder((Object[]) customizers);
for (RSocketServerCustomizer customizer : customizers) {
ordered.verify(customizer).customize(any(io.rsocket.core.RSocketServer.class));
}
}
private RSocketRequester createRSocketTcpClient() {
Assertions.assertThat(this.server).isNotNull();
InetSocketAddress address = this.server.address();