diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java index c2bc0825fe8..5eaf2319e18 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicator.java @@ -20,9 +20,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.neo4j.driver.Driver; import org.neo4j.driver.exceptions.SessionExpiredException; -import org.neo4j.driver.reactive.ReactiveResult; -import org.neo4j.driver.reactive.ReactiveSession; -import reactor.adapter.JdkFlowAdapter; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @@ -65,14 +65,13 @@ public final class Neo4jReactiveHealthIndicator extends AbstractReactiveHealthIn Mono runHealthCheckQuery() { // We use WRITE here to make sure UP is returned for a server that supports // all possible workloads - return Mono.using(() -> this.driver.reactiveSession(Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG), (session) -> { - Mono resultMono = JdkFlowAdapter - .flowPublisherToFlux(session.run(Neo4jHealthIndicator.CYPHER)).single(); - return resultMono - .flatMapMany((result) -> JdkFlowAdapter.flowPublisherToFlux(result.records()) - .zipWith(JdkFlowAdapter.flowPublisherToFlux(result.consume()))) - .map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single(); - }, ReactiveSession::close); + return Mono.using(() -> this.driver.session(ReactiveSession.class, Neo4jHealthIndicator.DEFAULT_SESSION_CONFIG), + (session) -> { + Mono resultMono = Flux.from(session.run(Neo4jHealthIndicator.CYPHER)).single(); + return resultMono + .flatMapMany((result) -> Flux.from(result.records()).zipWith(Flux.from(result.consume()))) + .map((tuple) -> new Neo4jHealthDetails(tuple.getT1(), tuple.getT2())).single(); + }, ReactiveSession::close); } -} +} \ No newline at end of file diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicatorTests.java index cba46a97f19..67a01028d40 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/neo4j/Neo4jReactiveHealthIndicatorTests.java @@ -25,10 +25,9 @@ import org.neo4j.driver.SessionConfig; import org.neo4j.driver.Values; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; -import org.neo4j.driver.reactive.ReactiveResult; -import org.neo4j.driver.reactive.ReactiveSession; +import org.neo4j.driver.reactivestreams.ReactiveResult; +import org.neo4j.driver.reactivestreams.ReactiveSession; import org.neo4j.driver.summary.ResultSummary; -import reactor.adapter.JdkFlowAdapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -38,6 +37,7 @@ import org.springframework.boot.actuate.health.Status; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; @@ -72,13 +72,12 @@ class Neo4jReactiveHealthIndicatorTests { AtomicInteger count = new AtomicInteger(); given(session.run(anyString())).will((invocation) -> { if (count.compareAndSet(0, 1)) { - return JdkFlowAdapter - .publisherToFlowPublisher(Flux.error(new SessionExpiredException("Session expired"))); + return Flux.error(new SessionExpiredException("Session expired")); } - return JdkFlowAdapter.publisherToFlowPublisher(Flux.just(statementResult)); + return Flux.just(statementResult); }); Driver driver = mock(Driver.class); - given(driver.reactiveSession(any(SessionConfig.class))).willReturn(session); + given(driver.session(eq(ReactiveSession.class), any(SessionConfig.class))).willReturn(session); Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver); healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> { assertThat(health.getStatus()).isEqualTo(Status.UP); @@ -91,7 +90,8 @@ class Neo4jReactiveHealthIndicatorTests { @Test void neo4jIsDown() { Driver driver = mock(Driver.class); - given(driver.reactiveSession(any(SessionConfig.class))).willThrow(ServiceUnavailableException.class); + given(driver.session(eq(ReactiveSession.class), any(SessionConfig.class))) + .willThrow(ServiceUnavailableException.class); Neo4jReactiveHealthIndicator healthIndicator = new Neo4jReactiveHealthIndicator(driver); healthIndicator.health().as(StepVerifier::create).consumeNextWith((health) -> { assertThat(health.getStatus()).isEqualTo(Status.DOWN); @@ -104,17 +104,17 @@ class Neo4jReactiveHealthIndicatorTests { given(record.get("edition")).willReturn(Values.value(edition)); given(record.get("version")).willReturn(Values.value(version)); ReactiveResult statementResult = mock(ReactiveResult.class); - given(statementResult.records()).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(record))); - given(statementResult.consume()).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(resultSummary))); + given(statementResult.records()).willReturn(Mono.just(record)); + given(statementResult.consume()).willReturn(Mono.just(resultSummary)); return statementResult; } private Driver mockDriver(ResultSummary resultSummary, String version, String edition) { ReactiveResult statementResult = mockStatementResult(resultSummary, version, edition); ReactiveSession session = mock(ReactiveSession.class); - given(session.run(anyString())).willReturn(JdkFlowAdapter.publisherToFlowPublisher(Mono.just(statementResult))); + given(session.run(anyString())).willReturn(Mono.just(statementResult)); Driver driver = mock(Driver.class); - given(driver.reactiveSession(any(SessionConfig.class))).willReturn(session); + given(driver.session(eq(ReactiveSession.class), any(SessionConfig.class))).willReturn(session); return driver; } diff --git a/spring-boot-project/spring-boot-dependencies/build.gradle b/spring-boot-project/spring-boot-dependencies/build.gradle index 88cad65c0f9..ef3b8a7c1dc 100644 --- a/spring-boot-project/spring-boot-dependencies/build.gradle +++ b/spring-boot-project/spring-boot-dependencies/build.gradle @@ -1091,7 +1091,7 @@ bom { ] } } - library("Neo4j Java Driver", "5.1.0") { + library("Neo4j Java Driver", "5.2.0") { group("org.neo4j.driver") { modules = [ "neo4j-java-driver"