Fix RedisReactiveHealthIndicator in clustered configuration

Prior to Spring Data Redis version 2.2.8, the contents of the
Properties object returned from the
ReactiveRedisConnection.ServerCommands.info API were the same
for clustered and non-clustered Redis configurations, containing a set
of key/value pairs. This allowed ReactiveRedisHealthIndicator to get
a version property using a well-known key. Starting with Spring Data
Redis 2.2.8, the info property keys contain a host:port prefix in a
clustered Redis configuration. This prevented
ReactiveRedisHealthIndicator from getting the version property as
before and resulted in the health always being reported as DOWN.

This commit adjusts ReactiveRedisHealthIndicator to detect the
clustered configuration from Spring Data Redis and find the version
property for one of the reported cluster nodes.

Fixes gh-22061
This commit is contained in:
Scott Frederick 2020-06-23 16:11:43 -05:00
parent 3fed27fd90
commit b27303704f
2 changed files with 40 additions and 4 deletions

View File

@ -24,6 +24,7 @@ import reactor.core.scheduler.Schedulers;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@ -33,6 +34,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
* @author Stephane Nicoll
* @author Mark Paluch
* @author Artsiom Yudovin
* @author Scott Frederick
* @since 2.0.0
*/
public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
@ -50,7 +52,8 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
}
private Mono<Health> doHealthCheck(Health.Builder builder, ReactiveRedisConnection connection) {
return connection.serverCommands().info().map((info) -> up(builder, info))
return connection.serverCommands().info()
.map((info) -> up(builder, info, (connection instanceof ReactiveRedisClusterConnection)))
.onErrorResume((ex) -> Mono.just(down(builder, ex)))
.flatMap((health) -> connection.closeLater().thenReturn(health));
}
@ -60,9 +63,21 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
.subscribeOn(Schedulers.boundedElastic());
}
private Health up(Health.Builder builder, Properties info) {
return builder.up()
.withDetail(RedisHealthIndicator.VERSION, info.getProperty(RedisHealthIndicator.REDIS_VERSION)).build();
private Health up(Health.Builder builder, Properties info, boolean isClusterConnection) {
if (isClusterConnection) {
return builder.up().withDetail(RedisHealthIndicator.VERSION, getClusterVersionProperty(info)).build();
}
else {
return builder.up()
.withDetail(RedisHealthIndicator.VERSION, info.getProperty(RedisHealthIndicator.REDIS_VERSION))
.build();
}
}
private Object getClusterVersionProperty(Properties info) {
return info.keySet().stream().map(String.class::cast)
.filter((key) -> key.endsWith(RedisHealthIndicator.REDIS_VERSION)).findFirst().map(info::get)
.orElse("");
}
private Health down(Health.Builder builder, Throwable cause) {

View File

@ -26,6 +26,8 @@ import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ReactiveClusterServerCommands;
import org.springframework.data.redis.connection.ReactiveRedisClusterConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveServerCommands;
@ -42,6 +44,7 @@ import static org.mockito.Mockito.verify;
* @author Mark Paluch
* @author Nikolay Rybak
* @author Artsiom Yudovin
* @author Scott Frederick
*/
class RedisReactiveHealthIndicatorTests {
@ -63,6 +66,24 @@ class RedisReactiveHealthIndicatorTests {
verify(redisConnection).closeLater();
}
@Test
void redisClusterIsUp() {
Properties info = new Properties();
info.put("127.0.0.1:7002.redis_version", "2.8.9");
ReactiveRedisConnection redisConnection = mock(ReactiveRedisClusterConnection.class);
given(redisConnection.closeLater()).willReturn(Mono.empty());
ReactiveClusterServerCommands commands = mock(ReactiveClusterServerCommands.class);
given(commands.info()).willReturn(Mono.just(info));
RedisReactiveHealthIndicator healthIndicator = createHealthIndicator(redisConnection, commands);
Mono<Health> health = healthIndicator.health();
StepVerifier.create(health).consumeNextWith((h) -> {
assertThat(h.getStatus()).isEqualTo(Status.UP);
assertThat(h.getDetails()).containsOnlyKeys("version");
assertThat(h.getDetails().get("version")).isEqualTo("2.8.9");
}).verifyComplete();
verify(redisConnection).closeLater();
}
@Test
void redisCommandIsDown() {
ReactiveServerCommands commands = mock(ReactiveServerCommands.class);