Fix race condition in Jetty graceful shutdown tests

Some of the Jetty graceful shutdown tests were flaky due to the way
in which Jetty behaves when it is stopped.
Stopping the Jetty web server interrupts the thread that's handling
the active request. This initiates a race between the request-handling
thread which will decrement the number of active requests and the
main thread which expects an active request to cause the shutdown
result to be REQUESTS_ACTIVE. The test passes when the main thread
wins and fails as a request is active which it's checked. When the
request-handling thread wins the test fails as the count of active
requests has been deprecated before it is checked.

The blocking servlet that's used to stall a request and keep it
active needs to be updated to ignore the thread being interrupted
and continue waiting. This will ensure that a request remains active
until the main thread has checked the active request count and
determine the result of the shutdown.

Closes gh-27464
This commit is contained in:
Andy Wilkinson 2021-07-23 10:25:38 +01:00
parent 8ebeedbf93
commit 9e81fb3e38

View File

@ -44,11 +44,11 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
@ -1136,8 +1136,8 @@ public abstract class AbstractServletWebServerFactoryTests {
AtomicReference<GracefulShutdownResult> result = new AtomicReference<>();
this.webServer.shutDownGracefully(result::set);
this.webServer.stop();
Awaitility.await().atMost(Duration.ofSeconds(30))
.until(() -> GracefulShutdownResult.REQUESTS_ACTIVE == result.get());
assertThat(Awaitility.await().atMost(Duration.ofSeconds(30)).until(result::get, Objects::nonNull))
.isEqualTo(GracefulShutdownResult.REQUESTS_ACTIVE);
try {
blockingServlet.admitOne();
}
@ -1486,7 +1486,7 @@ public abstract class AbstractServletWebServerFactoryTests {
protected static class BlockingServlet extends HttpServlet {
private final BlockingQueue<CyclicBarrier> barriers = new ArrayBlockingQueue<>(10);
private final BlockingQueue<Blocker> blockers = new ArrayBlockingQueue<>(10);
public BlockingServlet() {
@ -1494,42 +1494,23 @@ public abstract class AbstractServletWebServerFactoryTests {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
CyclicBarrier barrier = new CyclicBarrier(2);
this.barriers.add(barrier);
try {
barrier.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new ServletException(ex);
}
Blocker blocker = new Blocker();
this.blockers.add(blocker);
blocker.await();
}
public void admitOne() {
try {
CyclicBarrier barrier = this.barriers.take();
if (!barrier.isBroken()) {
barrier.await();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
public void admitOne() throws InterruptedException {
this.blockers.take().clear();
}
public void awaitQueue() throws InterruptedException {
while (this.barriers.isEmpty()) {
while (this.blockers.isEmpty()) {
Thread.sleep(100);
}
}
public void awaitQueue(int size) throws InterruptedException {
while (this.barriers.size() < size) {
while (this.blockers.size() < size) {
Thread.sleep(100);
}
}
@ -1538,45 +1519,58 @@ public abstract class AbstractServletWebServerFactoryTests {
static class BlockingAsyncServlet extends HttpServlet {
private final BlockingQueue<CyclicBarrier> barriers = new ArrayBlockingQueue<>(10);
private final BlockingQueue<Blocker> blockers = new ArrayBlockingQueue<>(10);
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
CyclicBarrier barrier = new CyclicBarrier(2);
this.barriers.add(barrier);
Blocker blocker = new Blocker();
this.blockers.add(blocker);
AsyncContext async = req.startAsync();
new Thread(() -> {
try {
barrier.await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
}
blocker.await();
async.complete();
}).start();
}
private void admitOne() {
try {
this.barriers.take().await();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (BrokenBarrierException ex) {
throw new RuntimeException(ex);
}
private void admitOne() throws InterruptedException {
this.blockers.take().clear();
}
private void awaitQueue() throws InterruptedException {
while (this.barriers.isEmpty()) {
while (this.blockers.isEmpty()) {
Thread.sleep(100);
}
}
}
private static final class Blocker {
private boolean block = true;
private final Object monitor = new Object();
private void await() {
synchronized (this.monitor) {
while (this.block) {
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
System.out.println("Interrupted!");
// Keep waiting
}
}
}
}
private void clear() {
synchronized (this.monitor) {
this.block = false;
this.monitor.notifyAll();
}
}
}
}