Record server metrics for cancelled connections

Prior to this commit, the Actuator instrumentation for WebFlux servers
would not record metrics in two cases:

* the client disconnects before the response has been sent
* a server timeout is triggered before the response is sent

This commit improves the existing instrumentation to record metrics in
these cases. Since the causes of timeouts/disconnections can vary a lot,
the chosen "outcome" tag for metrics is "UNKNOWN".

Closes gh-23606
This commit is contained in:
Brian Clozel 2021-03-29 21:27:50 +02:00
parent 9b65409e23
commit 61573fbf14
6 changed files with 120 additions and 25 deletions

View File

@ -0,0 +1,29 @@
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.metrics.web.reactive.server;
/**
* Runtime exception that materializes a {@link reactor.core.publisher.SignalType#CANCEL
* cancel signal} for the WebFlux server metrics instrumentation.
*
* @author Brian Clozel
* @since 2.5.0
* @see MetricsWebFilter
*/
public class CancelledServerWebExchangeException extends RuntimeException {
}

View File

@ -71,7 +71,8 @@ public class DefaultWebFluxTagsProvider implements WebFluxTagsProvider {
@Override
public Iterable<Tag> httpRequestTags(ServerWebExchange exchange, Throwable exception) {
Tags tags = Tags.of(WebFluxTags.method(exchange), WebFluxTags.uri(exchange, this.ignoreTrailingSlash),
WebFluxTags.exception(exception), WebFluxTags.status(exchange), WebFluxTags.outcome(exchange));
WebFluxTags.exception(exception), WebFluxTags.status(exchange),
WebFluxTags.outcome(exchange, exception));
for (WebFluxTagsContributor contributor : this.contributors) {
tags = tags.and(contributor.httpRequestTags(exchange, exception));
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -75,28 +75,24 @@ public class MetricsWebFilter implements WebFilter {
private Publisher<Void> filter(ServerWebExchange exchange, Mono<Void> call) {
long start = System.nanoTime();
return call.doOnSuccess((done) -> onSuccess(exchange, start))
.doOnError((cause) -> onError(exchange, start, cause));
return call.doOnEach((signal) -> onTerminalSignal(exchange, signal.getThrowable(), start))
.doOnCancel(() -> onTerminalSignal(exchange, new CancelledServerWebExchangeException(), start));
}
private void onSuccess(ServerWebExchange exchange, long start) {
record(exchange, start, null);
}
private void onError(ServerWebExchange exchange, long start, Throwable cause) {
private void onTerminalSignal(ServerWebExchange exchange, Throwable cause, long start) {
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
record(exchange, start, cause);
if (response.isCommitted() || cause instanceof CancelledServerWebExchangeException) {
record(exchange, cause, start);
}
else {
response.beforeCommit(() -> {
record(exchange, start, cause);
record(exchange, cause, start);
return Mono.empty();
});
}
}
private void record(ServerWebExchange exchange, long start, Throwable cause) {
private void record(ServerWebExchange exchange, Throwable cause, long start) {
Iterable<Tag> tags = this.tagsProvider.httpRequestTags(exchange, cause);
this.autoTimer.builder(this.metricName).tags(tags).register(this.registry).record(System.nanoTime() - start,
TimeUnit.NANOSECONDS);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,9 @@
package org.springframework.boot.actuate.metrics.web.reactive.server;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import io.micrometer.core.instrument.Tag;
@ -35,6 +38,7 @@ import org.springframework.web.util.pattern.PathPattern;
* @author Jon Schneider
* @author Andy Wilkinson
* @author Michael McFadyen
* @author Brian Clozel
* @since 2.0.0
*/
public final class WebFluxTags {
@ -51,6 +55,9 @@ public final class WebFluxTags {
private static final Pattern FORWARD_SLASHES_PATTERN = Pattern.compile("//+");
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = new HashSet<>(
Arrays.asList("AbortedException", "ClientAbortException", "EOFException", "EofException"));
private WebFluxTags() {
}
@ -165,8 +172,28 @@ public final class WebFluxTags {
* @param exchange the exchange
* @return the outcome tag derived from the response status
* @since 2.1.0
* @deprecated as of 2.5.0 in favor of {@link #outcome(ServerWebExchange, Throwable)},
* to be removed in 2.7.0
*/
public static Tag outcome(ServerWebExchange exchange) {
return outcome(exchange, null);
}
/**
* Creates an {@code outcome} tag based on the response status of the given
* {@code exchange} and the exception thrown during request processing.
* @param exchange the exchange
* @param exception the termination signal sent by the publisher
* @return the outcome tag derived from the response status
* @since 2.5.0
*/
public static Tag outcome(ServerWebExchange exchange, Throwable exception) {
if (exception != null) {
if (exception instanceof CancelledServerWebExchangeException
|| DISCONNECTED_CLIENT_EXCEPTIONS.contains(exception.getClass().getSimpleName())) {
return Outcome.UNKNOWN.asTag();
}
}
Integer statusCode = extractStatusCode(exchange);
Outcome outcome = (statusCode != null) ? Outcome.forStatus(statusCode) : Outcome.SUCCESS;
return outcome.asTag();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,7 @@
package org.springframework.boot.actuate.metrics.web.reactive.server;
import java.io.EOFException;
import java.time.Duration;
import io.micrometer.core.instrument.MockClock;
@ -24,6 +25,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.boot.actuate.metrics.AutoTimer;
import org.springframework.mock.http.server.reactive.MockServerHttpRequest;
@ -116,6 +118,32 @@ class MetricsWebFilterTests {
assertThat(this.registry.get(REQUEST_METRICS_NAME).tag("status", "200").timer().count()).isEqualTo(2);
}
@Test
void cancelledConnectionsShouldProduceMetrics() {
MockServerWebExchange exchange = createExchange("/projects/spring-boot", "/projects/{project}");
Mono<Void> processing = this.webFilter.filter(exchange,
(serverWebExchange) -> exchange.getResponse().setComplete());
StepVerifier.create(processing).thenCancel().verify(Duration.ofSeconds(5));
assertMetricsContainsTag("uri", "/projects/{project}");
assertMetricsContainsTag("status", "200");
assertMetricsContainsTag("outcome", "UNKNOWN");
}
@Test
void disconnectedExceptionShouldProduceMetrics() {
MockServerWebExchange exchange = createExchange("/projects/spring-boot", "/projects/{project}");
Mono<Void> processing = this.webFilter
.filter(exchange, (serverWebExchange) -> Mono.error(new EOFException("Disconnected")))
.onErrorResume((t) -> {
exchange.getResponse().setRawStatusCode(500);
return exchange.getResponse().setComplete();
});
StepVerifier.create(processing).expectComplete().verify(Duration.ofSeconds(5));
assertMetricsContainsTag("uri", "/projects/{project}");
assertMetricsContainsTag("status", "500");
assertMetricsContainsTag("outcome", "UNKNOWN");
}
private MockServerWebExchange createExchange(String path, String pathPattern) {
PathPatternParser parser = new PathPatternParser();
MockServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.get(path).build());

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2020 the original author or authors.
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,8 @@
package org.springframework.boot.actuate.metrics.web.reactive.server;
import java.io.EOFException;
import io.micrometer.core.instrument.Tag;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -140,7 +142,7 @@ class WebFluxTagsTests {
@Test
void outcomeTagIsSuccessWhenResponseStatusIsNull() {
this.exchange.getResponse().setStatusCode(null);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("SUCCESS");
}
@ -153,56 +155,68 @@ class WebFluxTagsTests {
given(response.getRawStatusCode()).willReturn(null);
given(exchange.getRequest()).willReturn(request);
given(exchange.getResponse()).willReturn(response);
Tag tag = WebFluxTags.outcome(exchange);
Tag tag = WebFluxTags.outcome(exchange, null);
assertThat(tag.getValue()).isEqualTo("SUCCESS");
}
@Test
void outcomeTagIsInformationalWhenResponseIs1xx() {
this.exchange.getResponse().setStatusCode(HttpStatus.CONTINUE);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("INFORMATIONAL");
}
@Test
void outcomeTagIsSuccessWhenResponseIs2xx() {
this.exchange.getResponse().setStatusCode(HttpStatus.OK);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("SUCCESS");
}
@Test
void outcomeTagIsRedirectionWhenResponseIs3xx() {
this.exchange.getResponse().setStatusCode(HttpStatus.MOVED_PERMANENTLY);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("REDIRECTION");
}
@Test
void outcomeTagIsClientErrorWhenResponseIs4xx() {
this.exchange.getResponse().setStatusCode(HttpStatus.BAD_REQUEST);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("CLIENT_ERROR");
}
@Test
void outcomeTagIsServerErrorWhenResponseIs5xx() {
this.exchange.getResponse().setStatusCode(HttpStatus.BAD_GATEWAY);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("SERVER_ERROR");
}
@Test
void outcomeTagIsClientErrorWhenResponseIsNonStandardInClientSeries() {
this.exchange.getResponse().setRawStatusCode(490);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("CLIENT_ERROR");
}
@Test
void outcomeTagIsUnknownWhenResponseStatusIsInUnknownSeries() {
this.exchange.getResponse().setRawStatusCode(701);
Tag tag = WebFluxTags.outcome(this.exchange);
Tag tag = WebFluxTags.outcome(this.exchange, null);
assertThat(tag.getValue()).isEqualTo("UNKNOWN");
}
@Test
void outcomeTagIsClientErrorWhenExceptionIsDisconnectedClient() {
Tag tag = WebFluxTags.outcome(this.exchange, new EOFException("broken pipe"));
assertThat(tag.getValue()).isEqualTo("UNKNOWN");
}
@Test
void outcomeTagIsClientErrorWhenExceptionIsCancelledExchange() {
Tag tag = WebFluxTags.outcome(this.exchange, new CancelledServerWebExchangeException());
assertThat(tag.getValue()).isEqualTo("UNKNOWN");
}