diff --git a/docs/changelog/131200.yaml b/docs/changelog/131200.yaml new file mode 100644 index 000000000000..49a88fa79f90 --- /dev/null +++ b/docs/changelog/131200.yaml @@ -0,0 +1,5 @@ +pr: 131200 +summary: Improve lost-increment message in repo analysis +area: Snapshot/Restore +type: enhancement +issues: [] diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java index 399a9eee0d75..40d1c12af906 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java @@ -76,6 +76,7 @@ import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.nullValue; public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { @@ -385,6 +386,55 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage()); } + public void testFailsOnLostIncrement() { + final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); + final AtomicBoolean registerWasCorrupted = new AtomicBoolean(); + + blobStore.setDisruption(new Disruption() { + @Override + public BytesReference onContendedCompareAndExchange(BytesRegister register, BytesReference expected, BytesReference updated) { + if (expected.equals(updated) == false // not the initial read + && updated.length() == Long.BYTES // not the final write + && randomBoolean() + && register.get().equals(expected) // would have succeeded + && registerWasCorrupted.compareAndSet(false, true)) { + + // indicate success without actually applying the update + return expected; + } + + return register.compareAndExchange(expected, updated); + } + }); + + safeAwait((ActionListener l) -> analyseRepository(request, l.delegateResponse((ll, e) -> { + if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) { + assertAnalysisFailureMessage(repositoryVerificationException.getMessage()); + assertTrue( + "did not lose increment, so why did the verification fail?", + // clear flag for final assertion + registerWasCorrupted.compareAndSet(true, false) + ); + assertThat( + asInstanceOf( + RepositoryVerificationException.class, + ExceptionsHelper.unwrapCause(repositoryVerificationException.getCause()) + ).getMessage(), + matchesPattern(""" + \\[test-repo] Successfully completed all \\[.*] atomic increments of register \\[test-register-contended-.*] \ + so its expected value is \\[OptionalBytesReference\\[.*]], but reading its value with \\[.*] unexpectedly \ + yielded \\[OptionalBytesReference\\[.*]]\\. This anomaly may indicate an atomicity failure amongst concurrent \ + compare-and-exchange operations on registers in this repository\\.""") + ); + ll.onResponse(null); + } else { + ll.onFailure(e); + } + }))); + + assertFalse(registerWasCorrupted.get()); + } + public void testFailsIfRegisterHoldsSpuriousValue() { final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo"); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java index 5418a5081c44..fd5bacdf5592 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/main/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalyzeAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThrottledIterator; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -647,67 +648,110 @@ public class RepositoryAnalyzeAction extends HandledTransportAction { - if (isRunning()) { - final var expectedFinalRegisterValue = expectedRegisterValue.get(); - transportService.getThreadPool() - .executor(ThreadPool.Names.SNAPSHOT) - .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener() { - @Override - public void onResponse(OptionalBytesReference actualFinalRegisterValue) { - if (actualFinalRegisterValue.isPresent() == false - || longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) { - fail( - new RepositoryVerificationException( - request.getRepositoryName(), - Strings.format( - "register [%s] should have value [%d] but instead had value [%s]", - registerName, - expectedFinalRegisterValue, - actualFinalRegisterValue - ) - ) - ); - } - } + return new Runnable() { - @Override - public void onFailure(Exception exp) { - // Registers are not supported on all repository types, and that's ok. - if (exp instanceof UnsupportedOperationException == false) { - fail(exp); + final CheckedConsumer, Exception> finalValueReader = switch (random.nextInt(3)) { + case 0 -> new CheckedConsumer, Exception>() { + @Override + public void accept(ActionListener listener) { + getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener); + } + + @Override + public String toString() { + return "getRegister"; + } + }; + case 1 -> new CheckedConsumer, Exception>() { + @Override + public void accept(ActionListener listener) { + getBlobContainer().compareAndExchangeRegister( + OperationPurpose.REPOSITORY_ANALYSIS, + registerName, + bytesFromLong(expectedFinalRegisterValue), + new BytesArray(new byte[] { (byte) 0xff }), + listener + ); + } + + @Override + public String toString() { + return "compareAndExchangeRegister"; + } + }; + case 2 -> new CheckedConsumer, Exception>() { + @Override + public void accept(ActionListener listener) { + getBlobContainer().compareAndSetRegister( + OperationPurpose.REPOSITORY_ANALYSIS, + registerName, + bytesFromLong(expectedFinalRegisterValue), + new BytesArray(new byte[] { (byte) 0xff }), + listener.map( + b -> b + ? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue)) + : OptionalBytesReference.MISSING + ) + ); + } + + @Override + public String toString() { + return "compareAndSetRegister"; + } + }; + default -> { + assert false; + throw new IllegalStateException(); + } + }; + + long expectedFinalRegisterValue = Long.MIN_VALUE; + + @Override + public void run() { + if (isRunning()) { + expectedFinalRegisterValue = expectedRegisterValue.get(); + transportService.getThreadPool() + .executor(ThreadPool.Names.SNAPSHOT) + .execute(ActionRunnable.wrap(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(OptionalBytesReference actualFinalRegisterValue) { + if (actualFinalRegisterValue.isPresent() == false + || longFromBytes(actualFinalRegisterValue.bytesReference()) != expectedFinalRegisterValue) { + fail( + new RepositoryVerificationException( + request.getRepositoryName(), + Strings.format( + """ + Successfully completed all [%d] atomic increments of register [%s] so its expected \ + value is [%s], but reading its value with [%s] unexpectedly yielded [%s]. This \ + anomaly may indicate an atomicity failure amongst concurrent compare-and-exchange \ + operations on registers in this repository.""", + expectedFinalRegisterValue, + registerName, + OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue)), + finalValueReader.toString(), + actualFinalRegisterValue + ) + ) + ); + } } - } - }, ref), listener -> { - switch (random.nextInt(3)) { - case 0 -> getBlobContainer().getRegister(OperationPurpose.REPOSITORY_ANALYSIS, registerName, listener); - case 1 -> getBlobContainer().compareAndExchangeRegister( - OperationPurpose.REPOSITORY_ANALYSIS, - registerName, - bytesFromLong(expectedFinalRegisterValue), - new BytesArray(new byte[] { (byte) 0xff }), - listener - ); - case 2 -> getBlobContainer().compareAndSetRegister( - OperationPurpose.REPOSITORY_ANALYSIS, - registerName, - bytesFromLong(expectedFinalRegisterValue), - new BytesArray(new byte[] { (byte) 0xff }), - listener.map( - b -> b - ? OptionalBytesReference.of(bytesFromLong(expectedFinalRegisterValue)) - : OptionalBytesReference.MISSING - ) - ); - default -> { - assert false; - throw new IllegalStateException(); + + @Override + public void onFailure(Exception exp) { + // Registers are not supported on all repository types, and that's ok. + if (exp instanceof UnsupportedOperationException == false) { + fail(exp); + } } - } - })); - } else { - ref.close(); + }, ref), finalValueReader)); + } else { + ref.close(); + } } + }; }