Record cancelled client requests in WebClient

Prior to this commit, cancelled client requests (for example as a result
of a `timeout()` reactor operator would not be recorded by Micrometer.

This commit instruments the cancelled signal for outgoing client
requests and assigns a status `CLIENT_ERROR`.
The cancellation can be intentional (triggering a timeout and falling
back on a faster alternative) or considered as an error. The intent
cannot be derived from the signal itself so we're considering it as a
client error.

Closes gh-18444
This commit is contained in:
Brian Clozel 2020-04-10 22:25:46 +02:00
parent a6d1f1c41c
commit 3879a7505c
6 changed files with 85 additions and 25 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -37,9 +37,9 @@ public class DefaultWebClientExchangeTagsProvider implements WebClientExchangeTa
Tag method = WebClientExchangeTags.method(request);
Tag uri = WebClientExchangeTags.uri(request);
Tag clientName = WebClientExchangeTags.clientName(request);
return Arrays.asList(method, uri, clientName,
(response != null) ? WebClientExchangeTags.status(response) : WebClientExchangeTags.status(throwable),
WebClientExchangeTags.outcome(response));
Tag status = WebClientExchangeTags.status(response, throwable);
Tag outcome = WebClientExchangeTags.outcome(response);
return Arrays.asList(method, uri, clientName, status, outcome);
}
}

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;
import org.springframework.boot.actuate.metrics.AutoTimer;
@ -71,16 +72,27 @@ public class MetricsWebClientFilterFunction implements ExchangeFilterFunction {
if (!this.autoTimer.isEnabled()) {
return next.exchange(request);
}
return next.exchange(request).doOnEach((signal) -> {
if (!signal.isOnComplete()) {
Long startTime = getStartTime(signal.getContext());
ClientResponse response = signal.get();
Throwable throwable = signal.getThrowable();
Iterable<Tag> tags = this.tagProvider.tags(request, response, throwable);
this.autoTimer.builder(this.metricName).tags(tags).description("Timer of WebClient operation")
.register(this.meterRegistry).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return next.exchange(request).as((responseMono) -> instrumentResponse(request, responseMono))
.subscriberContext(this::putStartTime);
}
private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
return Mono.deferWithContext((ctx) -> responseMono.doOnEach((signal) -> {
if (signal.isOnNext() || signal.isOnError()) {
Iterable<Tag> tags = this.tagProvider.tags(request, signal.get(), signal.getThrowable());
recordTimer(tags, getStartTime(ctx));
}
}).subscriberContext(this::putStartTime);
}).doFinally((signalType) -> {
if (SignalType.CANCEL.equals(signalType)) {
Iterable<Tag> tags = this.tagProvider.tags(request, null, null);
recordTimer(tags, getStartTime(ctx));
}
}));
}
private void recordTimer(Iterable<Tag> tags, Long startTime) {
this.autoTimer.builder(this.metricName).tags(tags).description("Timer of WebClient operation")
.register(this.meterRegistry).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
private Long getStartTime(Context context) {

View File

@ -75,12 +75,33 @@ public final class WebClientExchangeTags {
return (path.startsWith("/") ? path : "/" + path);
}
/**
* Creates a {@code status} {@code Tag} derived from the
* {@link ClientResponse#statusCode()} of the given {@code response} if available, the
* thrown exception otherwise, or considers the request as Cancelled as a last resort.
* @param response the response
* @param throwable the exception
* @return the status tag
* @since 2.3.0
*/
public static Tag status(ClientResponse response, Throwable throwable) {
if (response != null) {
return Tag.of("status", String.valueOf(response.rawStatusCode()));
}
else if (throwable != null) {
return (throwable instanceof IOException) ? IO_ERROR : CLIENT_ERROR;
}
return CLIENT_ERROR;
}
/**
* Creates a {@code status} {@code Tag} derived from the
* {@link ClientResponse#statusCode()} of the given {@code response}.
* @param response the response
* @return the status tag
* @deprecated since 2.3.0 in favor of {@link #status(ClientResponse, Throwable)}
*/
@Deprecated
public static Tag status(ClientResponse response) {
return Tag.of("status", String.valueOf(response.rawStatusCode()));
}
@ -90,7 +111,9 @@ public final class WebClientExchangeTags {
* client.
* @param throwable the exception
* @return the status tag
* @deprecated since 2.3.0 in favor of {@link #status(ClientResponse, Throwable)}
*/
@Deprecated
public static Tag status(Throwable throwable) {
return (throwable instanceof IOException) ? IO_ERROR : CLIENT_ERROR;
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -87,4 +87,11 @@ class DefaultWebClientExchangeTagsProviderTests {
Tag.of("clientName", "example.org"), Tag.of("status", "CLIENT_ERROR"), Tag.of("outcome", "UNKNOWN"));
}
@Test
void tagsWhenCancelledRequestShouldReturnClientErrorStatus() {
Iterable<Tag> tags = this.tagsProvider.tags(this.request, null, null);
assertThat(tags).containsExactlyInAnyOrder(Tag.of("method", "GET"), Tag.of("uri", "/projects/{project}"),
Tag.of("clientName", "example.org"), Tag.of("status", "CLIENT_ERROR"), Tag.of("outcome", "UNKNOWN"));
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -29,6 +29,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.boot.actuate.metrics.AutoTimer;
import org.springframework.http.HttpMethod;
@ -73,7 +74,7 @@ class MetricsWebClientFilterFunctionTests {
ClientRequest request = ClientRequest
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(30));
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "200").timer().count()).isEqualTo(1);
}
@ -84,7 +85,7 @@ class MetricsWebClientFilterFunctionTests {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot"))
.attribute(URI_TEMPLATE_ATTRIBUTE, "/projects/{project}").build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(30));
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/{project}", "status", "200").timer().count()).isEqualTo(1);
}
@ -95,7 +96,7 @@ class MetricsWebClientFilterFunctionTests {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
ExchangeFunction errorExchange = (r) -> Mono.error(new IOException());
this.filterFunction.filter(request, errorExchange).onErrorResume(IOException.class, (t) -> Mono.empty())
.block(Duration.ofSeconds(30));
.block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "IO_ERROR").timer().count())
.isEqualTo(1);
@ -107,7 +108,19 @@ class MetricsWebClientFilterFunctionTests {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
ExchangeFunction exchange = (r) -> Mono.error(new IllegalArgumentException());
this.filterFunction.filter(request, exchange).onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty())
.block(Duration.ofSeconds(30));
.block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer().count())
.isEqualTo(1);
}
@Test
void filterWhenCancelThrownShouldRecordTimer() {
ClientRequest request = ClientRequest
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
Mono<ClientResponse> filter = this.filterFunction.filter(request, this.exchange);
StepVerifier.create(filter).thenCancel().verify(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer().count())
.isEqualTo(1);
@ -120,7 +133,7 @@ class MetricsWebClientFilterFunctionTests {
ExchangeFunction exchange = (r) -> Mono.error(new IllegalArgumentException())
.delaySubscription(Duration.ofMillis(300)).cast(ClientResponse.class);
this.filterFunction.filter(request, exchange).retry(1)
.onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty()).block(Duration.ofSeconds(30));
.onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty()).block(Duration.ofSeconds(5));
Timer timer = this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer();
assertThat(timer.count()).isEqualTo(2);

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -86,24 +86,29 @@ class WebClientExchangeTagsTests {
@Test
void status() {
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
assertThat(WebClientExchangeTags.status(this.response)).isEqualTo(Tag.of("status", "200"));
assertThat(WebClientExchangeTags.status(this.response, null)).isEqualTo(Tag.of("status", "200"));
}
@Test
void statusWhenIOException() {
assertThat(WebClientExchangeTags.status(new IOException())).isEqualTo(Tag.of("status", "IO_ERROR"));
assertThat(WebClientExchangeTags.status(null, new IOException())).isEqualTo(Tag.of("status", "IO_ERROR"));
}
@Test
void statusWhenClientException() {
assertThat(WebClientExchangeTags.status(new IllegalArgumentException()))
assertThat(WebClientExchangeTags.status(null, new IllegalArgumentException()))
.isEqualTo(Tag.of("status", "CLIENT_ERROR"));
}
@Test
void statusWhenNonStandard() {
given(this.response.rawStatusCode()).willReturn(490);
assertThat(WebClientExchangeTags.status(this.response)).isEqualTo(Tag.of("status", "490"));
assertThat(WebClientExchangeTags.status(this.response, null)).isEqualTo(Tag.of("status", "490"));
}
@Test
void statusWhenCancelled() {
assertThat(WebClientExchangeTags.status(null, null)).isEqualTo(Tag.of("status", "CLIENT_ERROR"));
}
@Test