Fix Neo4jReactiveHealthIndicator NoSuchElementException

Update `Neo4jReactiveHealthIndicator` to ensure that `result.records()`
is called before `result.consume()`. Prior to this commit, the indicator
used `zipWith` to merge records with a summary. This worked with the
previous RxJava based driver, but fails with the Reactor based driver
due to a `NoSuchElementException: Source was empty` error.

Fixes gh-33428
This commit is contained in:
Phillip Webb 2022-11-30 20:29:54 -08:00
parent 8849f72ac5
commit 93f8dc76ab

View File

@ -19,9 +19,11 @@ package org.springframework.boot.actuate.neo4j;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;
import org.neo4j.driver.summary.ResultSummary;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
@ -36,6 +38,7 @@ import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
*
* @author Michael J. Simons
* @author Stephane Nicoll
* @author Phillip Webb
* @since 2.4.0
*/
public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
@ -63,15 +66,40 @@ public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIn
}
Mono<Neo4jHealthDetails> runHealthCheckQuery() {
// We use WRITE here to make sure UP is returned for a server that supports
// all possible workloads
return Mono.using(() -> this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG),
(session) -> {
Mono<ReactiveResult> resultMono = Flux.from(session.run(Neo4jHealthIndicator.CYPHER)).single();
return resultMono
.flatMapMany((result) -> Flux.from(result.records()).zipWith(Flux.from(result.consume())))
.map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single();
}, ReactiveSession::close);
return Mono.using(this::session, this::healthDetails, ReactiveSession::close);
}
private ReactiveSession session() {
return this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG);
}
private Mono<Neo4jHealthDetails> healthDetails(ReactiveSession session) {
return Mono.from(session.run(Neo4jHealthIndicator.CYPHER)).flatMap(this::healthDetails);
}
private Mono<? extends Neo4jHealthDetails> healthDetails(ReactiveResult result) {
Flux<Record> records = Flux.from(result.records());
Mono<ResultSummary> summary = Mono.from(result.consume());
Neo4jHealthDetailsBuilder builder = new Neo4jHealthDetailsBuilder();
return records.single().doOnNext(builder::record).then(summary).map(builder::build);
}
/**
* Builder used to create a {@link Neo4jHealthDetails} from a {@link Record} and a
* {@link ResultSummary}.
*/
private static class Neo4jHealthDetailsBuilder {
private Record record;
void record(Record record) {
this.record = record;
}
private Neo4jHealthDetails build(ResultSummary summary) {
return new Neo4jHealthDetails(this.record, summary);
}
}
}