mirror of
https://github.com/spring-projects/spring-boot.git
synced 2024-07-15 01:07:30 +08:00
Fix handling of Flux responses from Actuator endpoints
Closes gh-30095
This commit is contained in:
parent
2f7feee672
commit
6eacc07de0
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2019 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -29,6 +29,7 @@ import javax.management.MBeanException;
|
||||
import javax.management.MBeanInfo;
|
||||
import javax.management.ReflectionException;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
|
||||
@ -172,6 +173,9 @@ public class EndpointMBean implements DynamicMBean {
|
||||
private static class ReactiveHandler {
|
||||
|
||||
static Object handle(Object result) {
|
||||
if (result instanceof Flux) {
|
||||
result = ((Flux<?>) result).collectList();
|
||||
}
|
||||
if (result instanceof Mono) {
|
||||
return ((Mono<?>) result).block();
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -38,6 +38,7 @@ import org.glassfish.jersey.process.Inflector;
|
||||
import org.glassfish.jersey.server.ContainerRequest;
|
||||
import org.glassfish.jersey.server.model.Resource;
|
||||
import org.glassfish.jersey.server.model.Resource.Builder;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
|
||||
@ -128,6 +129,7 @@ public class JerseyEndpointResourceFactory {
|
||||
List<Function<Object, Object>> converters = new ArrayList<>();
|
||||
converters.add(new ResourceBodyConverter());
|
||||
if (ClassUtils.isPresent("reactor.core.publisher.Mono", OperationInflector.class.getClassLoader())) {
|
||||
converters.add(new FluxBodyConverter());
|
||||
converters.add(new MonoBodyConverter());
|
||||
}
|
||||
BODY_CONVERTERS = Collections.unmodifiableList(converters);
|
||||
@ -268,6 +270,21 @@ public class JerseyEndpointResourceFactory {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Body converter from {@link Flux} to {@link Flux#collectList Mono<List>}.
|
||||
*/
|
||||
private static final class FluxBodyConverter implements Function<Object, Object> {
|
||||
|
||||
@Override
|
||||
public Object apply(Object body) {
|
||||
if (body instanceof Flux) {
|
||||
return ((Flux<?>) body).collectList();
|
||||
}
|
||||
return body;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Inflector} to for endpoint links.
|
||||
*/
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -26,6 +26,7 @@ import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
@ -338,6 +339,9 @@ public abstract class AbstractWebFluxEndpointHandlerMapping extends RequestMappi
|
||||
}
|
||||
|
||||
private Mono<ResponseEntity<Object>> handleResult(Publisher<?> result, HttpMethod httpMethod) {
|
||||
if (result instanceof Flux) {
|
||||
result = ((Flux<?>) result).collectList();
|
||||
}
|
||||
return Mono.from(result).map(this::toResponseEntity)
|
||||
.onErrorMap(InvalidEndpointRequestException.class,
|
||||
(ex) -> new ResponseStatusException(HttpStatus.BAD_REQUEST, ex.getReason()))
|
||||
|
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@ -19,16 +19,21 @@ package org.springframework.boot.actuate.endpoint.web.servlet;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.Principal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.boot.actuate.endpoint.InvalidEndpointRequestException;
|
||||
import org.springframework.boot.actuate.endpoint.InvocationContext;
|
||||
@ -49,6 +54,7 @@ import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.server.ServletServerHttpRequest;
|
||||
import org.springframework.util.AntPathMatcher;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
@ -274,6 +280,17 @@ public abstract class AbstractWebMvcEndpointHandlerMapping extends RequestMappin
|
||||
|
||||
private static final String PATH_SEPARATOR = AntPathMatcher.DEFAULT_PATH_SEPARATOR;
|
||||
|
||||
private static final List<Function<Object, Object>> BODY_CONVERTERS;
|
||||
|
||||
static {
|
||||
List<Function<Object, Object>> converters = new ArrayList<>();
|
||||
if (ClassUtils.isPresent("reactor.core.publisher.Flux",
|
||||
ServletWebOperationAdapter.class.getClassLoader())) {
|
||||
converters.add(new FluxBodyConverter());
|
||||
}
|
||||
BODY_CONVERTERS = Collections.unmodifiableList(converters);
|
||||
}
|
||||
|
||||
private final WebOperation operation;
|
||||
|
||||
ServletWebOperationAdapter(WebOperation operation) {
|
||||
@ -350,12 +367,32 @@ public abstract class AbstractWebMvcEndpointHandlerMapping extends RequestMappin
|
||||
(httpMethod != HttpMethod.GET) ? HttpStatus.NO_CONTENT : HttpStatus.NOT_FOUND);
|
||||
}
|
||||
if (!(result instanceof WebEndpointResponse)) {
|
||||
return result;
|
||||
return convertIfNecessary(result);
|
||||
}
|
||||
WebEndpointResponse<?> response = (WebEndpointResponse<?>) result;
|
||||
MediaType contentType = (response.getContentType() != null) ? new MediaType(response.getContentType())
|
||||
: null;
|
||||
return ResponseEntity.status(response.getStatus()).contentType(contentType).body(response.getBody());
|
||||
return ResponseEntity.status(response.getStatus()).contentType(contentType)
|
||||
.body(convertIfNecessary(response.getBody()));
|
||||
}
|
||||
|
||||
private Object convertIfNecessary(Object body) {
|
||||
for (Function<Object, Object> converter : BODY_CONVERTERS) {
|
||||
body = converter.apply(body);
|
||||
}
|
||||
return body;
|
||||
}
|
||||
|
||||
private static class FluxBodyConverter implements Function<Object, Object> {
|
||||
|
||||
@Override
|
||||
public Object apply(Object body) {
|
||||
if (!(body instanceof Flux)) {
|
||||
return body;
|
||||
}
|
||||
return ((Flux<?>) body).collectList();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import javax.management.MBeanInfo;
|
||||
import javax.management.ReflectionException;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.beans.FatalBeanException;
|
||||
@ -155,6 +156,15 @@ class EndpointMBeanTests {
|
||||
assertThat(result).isEqualTo("monoResult");
|
||||
}
|
||||
|
||||
@Test
|
||||
void invokeWhenFluxResultShouldCollectToMonoListAndBlockOnMono() throws MBeanException, ReflectionException {
|
||||
TestExposableJmxEndpoint endpoint = new TestExposableJmxEndpoint(
|
||||
new TestJmxOperation((arguments) -> Flux.just("flux", "result")));
|
||||
EndpointMBean bean = new EndpointMBean(this.responseMapper, null, endpoint);
|
||||
Object result = bean.invoke("testOperation", NO_PARAMS, NO_SIGNATURE);
|
||||
assertThat(result).asList().containsExactly("flux", "result");
|
||||
}
|
||||
|
||||
@Test
|
||||
void invokeShouldCallResponseMapper() throws MBeanException, ReflectionException {
|
||||
TestJmxOperationResponseMapper responseMapper = spy(this.responseMapper);
|
||||
|
@ -28,6 +28,7 @@ import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.SecurityContext;
|
||||
@ -269,6 +270,14 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
|
||||
.isOk().expectBody().jsonPath("a").isEqualTo("alpha"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void readOperationWithFluxResponse() {
|
||||
load(FluxResponseEndpointConfiguration.class,
|
||||
(client) -> client.get().uri("/flux").exchange().expectStatus().isOk().expectBody().jsonPath("[0].a")
|
||||
.isEqualTo("alpha").jsonPath("[1].b").isEqualTo("bravo").jsonPath("[2].c")
|
||||
.isEqualTo("charlie"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void readOperationWithCustomMediaType() {
|
||||
load(CustomMediaTypesEndpointConfiguration.class, (client) -> client.get().uri("/custommediatypes").exchange()
|
||||
@ -564,6 +573,17 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
|
||||
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@Import(BaseConfiguration.class)
|
||||
static class FluxResponseEndpointConfiguration {
|
||||
|
||||
@Bean
|
||||
FluxResponseEndpoint testEndpoint(EndpointDelegate endpointDelegate) {
|
||||
return new FluxResponseEndpoint();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@Import(BaseConfiguration.class)
|
||||
static class CustomMediaTypesEndpointConfiguration {
|
||||
@ -806,6 +826,17 @@ public abstract class AbstractWebEndpointIntegrationTests<T extends Configurable
|
||||
|
||||
}
|
||||
|
||||
@Endpoint(id = "flux")
|
||||
static class FluxResponseEndpoint {
|
||||
|
||||
@ReadOperation
|
||||
Flux<Map<String, String>> operation() {
|
||||
return Flux.just(Collections.singletonMap("a", "alpha"), Collections.singletonMap("b", "bravo"),
|
||||
Collections.singletonMap("c", "charlie"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Endpoint(id = "custommediatypes")
|
||||
static class CustomMediaTypesEndpoint {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user