[Failure store] Reconciliate failure indices during snapshotting (#118834)

In this PR we reconciliate the failure indices of a data stream just
like we do for the backing indices. The only difference is that a data
stream can have an empty list of failure indices, while it cannot have
an empty list of backing indices.

An easy way to create a situation where certain backing or failure
indices are not included in a snapshot is via using exclusions in the
multi-target expression of the snapshot. For example:

```
PUT /_snapshot/my_repository/my-snapshot?wait_for_completion=true
{
  "indices": "my-ds*", "-.fs-my-ds-000001"
}
```
This commit is contained in:
Mary Gouseti 2024-12-19 12:20:12 +02:00 committed by GitHub
parent 1cf5b03b31
commit 4efeca83b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 224 additions and 180 deletions

View File

@ -60,7 +60,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -77,6 +76,7 @@ import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
@ -145,18 +145,11 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
// Resolve backing index names after data streams have been created:
// (these names have a date component, and running around midnight could lead to test failures otherwise)
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet();
dsBackingIndexName = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName();
otherDsBackingIndexName = getDataStreamResponse.getDataStreams().get(1).getDataStream().getIndices().get(0).getName();
fsBackingIndexName = getDataStreamResponse.getDataStreams().get(2).getDataStream().getIndices().get(0).getName();
fsFailureIndexName = getDataStreamResponse.getDataStreams()
.get(2)
.getDataStream()
.getFailureIndices()
.getIndices()
.get(0)
.getName();
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("*");
dsBackingIndexName = dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName();
otherDsBackingIndexName = dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName();
fsBackingIndexName = dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName();
fsFailureIndexName = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName();
// Will be used in some tests, to test renaming while restoring a snapshot:
ds2BackingIndexName = dsBackingIndexName.replace("-ds-", "-ds2-");
@ -198,9 +191,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());
assertAcked(
client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" }))
);
assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "ds")));
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
@ -218,13 +209,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> ds = getDataStreamInfo("ds");
assertEquals(1, ds.size());
assertEquals(1, ds.get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, ds.get(0).getDataStream().getIndices().get(0).getName());
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
@ -278,19 +266,18 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get();
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("*");
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
dataStreamInfos.stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"), equalTo("other-ds"), equalTo("with-fs"))
);
List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
List<Index> backingIndices = dataStreamInfos.get(0).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(dsBackingIndexName));
backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
backingIndices = dataStreamInfos.get(1).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(otherDsBackingIndexName));
backingIndices = ds.getDataStreams().get(2).getDataStream().getIndices();
backingIndices = dataStreamInfos.get(2).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsBackingIndexName));
List<Index> failureIndices = ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices();
List<Index> failureIndices = dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices();
assertThat(failureIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(fsFailureIndexName));
}
@ -337,14 +324,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet();
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"))
);
List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1));
List<GetDataStreamAction.Response.DataStreamInfo> dsInfo = getDataStreamInfo("ds");
assertThat(dsInfo.stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()), contains(equalTo("ds")));
List<Index> backingIndices = dsInfo.get(0).getDataStream().getIndices();
assertThat(dsInfo.get(0).getDataStream().getIndices(), hasSize(1));
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(dsBackingIndexName)));
// The backing index created as part of rollover should still exist (but just not part of the data stream)
@ -357,39 +340,40 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
}
public void testFailureStoreSnapshotAndRestore() throws Exception {
String dataStreamName = "with-fs";
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("with-fs")
.setIndices(dataStreamName)
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
assertThat(getSnapshot(REPO, SNAPSHOT).dataStreams(), containsInAnyOrder(dataStreamName));
assertThat(getSnapshot(REPO, SNAPSHOT).indices(), containsInAnyOrder(fsBackingIndexName, fsFailureIndexName));
assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "with-fs")));
assertAcked(
client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, dataStreamName))
);
{
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("with-fs")
.setIndices(dataStreamName)
.get();
assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards());
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "with-fs" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fsFailureIndexName, ds.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo(dataStreamName);
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fsFailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
}
{
// With rename pattern
@ -397,21 +381,18 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("with-fs")
.setIndices(dataStreamName)
.setRenamePattern("-fs")
.setRenameReplacement("-fs2")
.get();
assertEquals(2, restoreSnapshotResponse.getRestoreInfo().successfulShards());
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "with-fs2" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(fs2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fs2FailureIndexName, ds.getDataStreams().get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("with-fs2");
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(fs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(fs2FailureIndexName, dataStreamInfos.get(0).getDataStream().getFailureIndices().getIndices().get(0).getName());
}
}
@ -477,13 +458,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamToSnapshot })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(backingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo(dataStreamToSnapshot);
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(backingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases().keySet(), contains(dataStreamToSnapshot));
@ -536,13 +514,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" })
).get();
assertEquals(3, ds.getDataStreams().size());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("*");
assertEquals(3, dataStreamInfos.size());
assertThat(
ds.getDataStreams().stream().map(i -> i.getDataStream().getName()).collect(Collectors.toList()),
dataStreamInfos.stream().map(i -> i.getDataStream().getName()).collect(Collectors.toList()),
containsInAnyOrder("ds", "other-ds", "with-fs")
);
@ -596,19 +571,16 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" })
).get();
assertEquals(3, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size());
assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, ds.getDataStreams().get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().size());
assertEquals(fsFailureIndexName, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("*");
assertEquals(3, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(1).getDataStream().getIndices().size());
assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().size());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds"));
@ -667,19 +639,16 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());
});
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" })
).get();
assertEquals(3, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(1).getDataStream().getIndices().size());
assertEquals(otherDsBackingIndexName, ds.getDataStreams().get(1).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, ds.getDataStreams().get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, ds.getDataStreams().get(2).getDataStream().getIndices().size());
assertEquals(fsFailureIndexName, ds.getDataStreams().get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("*");
assertEquals(3, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(dsBackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(1).getDataStream().getIndices().size());
assertEquals(otherDsBackingIndexName, dataStreamInfos.get(1).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsBackingIndexName, dataStreamInfos.get(2).getDataStream().getIndices().get(0).getName());
assertEquals(1, dataStreamInfos.get(2).getDataStream().getIndices().size());
assertEquals(fsFailureIndexName, dataStreamInfos.get(2).getDataStream().getFailureIndices().getIndices().get(0).getName());
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("*")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases(), anEmptyMap());
@ -721,13 +690,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.setRenameReplacement("ds2")
.get();
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds2" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(ds2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("ds2");
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(ds2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertResponse(
client.prepareSearch("ds2"),
response -> assertEquals(DOCUMENT_SOURCE, response.getHits().getHits()[0].getSourceAsMap())
@ -779,13 +745,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.setRenameReplacement("other-ds2")
.get();
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "other-ds2" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(otherDs2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("other-ds2");
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(otherDs2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
GetAliasesResponse getAliasesResponse = client.admin().indices().getAliases(new GetAliasesRequest("my-alias")).actionGet();
assertThat(getAliasesResponse.getDataStreamAliases().keySet(), containsInAnyOrder("ds", "other-ds", "other-ds2"));
@ -849,9 +812,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK));
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" });
GetDataStreamAction.Response response = client.execute(GetDataStreamAction.INSTANCE, getDSRequest).actionGet();
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName));
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("ds");
assertThat(dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName));
}
public void testDataStreamAndBackingIndicesAreRenamedUsingRegex() {
@ -888,17 +850,15 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK));
// assert "ds" was restored as "test-ds" and the backing index has a valid name
GetDataStreamAction.Request getRenamedDS = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "test-ds" });
GetDataStreamAction.Response response = client.execute(GetDataStreamAction.INSTANCE, getRenamedDS).actionGet();
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("test-ds");
assertThat(
response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(),
dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(),
is(DataStream.getDefaultBackingIndexName("test-ds", 1L))
);
// data stream "ds" should still exist in the system
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds" });
response = client.execute(GetDataStreamAction.INSTANCE, getDSRequest).actionGet();
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName));
dataStreamInfos = getDataStreamInfo("ds");
assertThat(dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName(), is(dsBackingIndexName));
}
public void testWildcards() throws Exception {
@ -924,16 +884,13 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(RestStatus.OK, restoreSnapshotResponse.status());
GetDataStreamAction.Response ds = client.execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "ds2" })
).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(ds2BackingIndexName, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = getDataStreamInfo("ds2");
assertEquals(1, dataStreamInfos.size());
assertEquals(1, dataStreamInfos.get(0).getDataStream().getIndices().size());
assertEquals(ds2BackingIndexName, dataStreamInfos.get(0).getDataStream().getIndices().get(0).getName());
assertThat(
"we renamed the restored data stream to one that doesn't match any existing composable template",
ds.getDataStreams().get(0).getIndexTemplate(),
dataStreamInfos.get(0).getIndexTemplate(),
is(nullValue())
);
}
@ -955,7 +912,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
);
}
public void testDataStreamNotRestoredWhenIndexRequested() throws Exception {
public void testDataStreamNotRestoredWhenIndexRequested() {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, "snap2")
@ -984,7 +941,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
expectThrows(ResourceNotFoundException.class, client.execute(GetDataStreamAction.INSTANCE, getRequest));
}
public void testDataStreamNotIncludedInLimitedSnapshot() throws ExecutionException, InterruptedException {
public void testDataStreamNotIncludedInLimitedSnapshot() {
final String snapshotName = "test-snap";
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
@ -1042,12 +999,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertDocCount(dataStream, 100L);
// Resolve backing index name after the data stream has been created because it has a date component,
// and running around midnight could lead to test failures otherwise
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
new String[] { dataStream }
);
GetDataStreamAction.Response getDataStreamResponse = client.execute(GetDataStreamAction.INSTANCE, getDataStreamRequest).actionGet();
String backingIndexName = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName();
String backingIndexName = getDataStreamInfo(dataStream).get(0).getDataStream().getIndices().get(0).getName();
logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> future = client1.admin()
@ -1235,7 +1187,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
assertEquals(restoreSnapshotResponse.failedShards(), 0);
}
public void testExcludeDSFromSnapshotWhenExcludingItsIndices() {
public void testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices() {
final String snapshot = "test-snapshot";
final String indexWithoutDataStream = "test-idx-no-ds";
createIndexWithContent(indexWithoutDataStream);
@ -1251,10 +1203,47 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.getRestoreInfo();
assertThat(restoreInfo.failedShards(), is(0));
assertThat(restoreInfo.successfulShards(), is(1));
// Exclude only failure store indices
{
String dataStreamName = "with-fs";
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(dataStreamName + "*", "-.fs*")
.setIncludeGlobalState(false)
.get();
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
SnapshotInfo retrievedSnapshot = getSnapshot(REPO, SNAPSHOT);
assertThat(retrievedSnapshot.dataStreams(), contains(dataStreamName));
assertThat(retrievedSnapshot.indices(), containsInAnyOrder(fsBackingIndexName));
assertAcked(
safeGet(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*")))
);
RestoreInfo restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(dataStreamName)
.get()
.getRestoreInfo();
assertThat(restoreSnapshotResponse, notNullValue());
assertThat(restoreSnapshotResponse.successfulShards(), equalTo(restoreSnapshotResponse.totalShards()));
assertThat(restoreSnapshotResponse.failedShards(), is(0));
GetDataStreamAction.Response.DataStreamInfo dataStream = getDataStreamInfo(dataStreamName).getFirst();
assertThat(dataStream.getDataStream().getBackingIndices().getIndices(), not(empty()));
assertThat(dataStream.getDataStream().getFailureIndices().getIndices(), empty());
}
}
/**
* This test is a copy of the {@link #testExcludeDSFromSnapshotWhenExcludingItsIndices()} the only difference
* This test is a copy of the {@link #testExcludeDSFromSnapshotWhenExcludingAnyOfItsIndices()} ()} the only difference
* is that one include the global state and one doesn't. In general this shouldn't matter that's why it used to be
* a random parameter of the test, but because of #107515 it fails when we include the global state. Keep them
* separate until this is fixed.
@ -1284,10 +1273,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
createIndexWithContent(indexName);
createFullSnapshot(REPO, snapshotName);
assertAcked(
client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" }))
.get()
);
assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*")).get());
assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.lenientExpandOpenHidden()).get());
RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
@ -1297,8 +1283,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
.get();
assertEquals(RestStatus.OK, restoreSnapshotResponse.status());
GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { "*" });
assertThat(client.execute(GetDataStreamAction.INSTANCE, getRequest).get().getDataStreams(), hasSize(3));
assertThat(getDataStreamInfo("*"), hasSize(3));
assertNotNull(client.admin().indices().prepareGetIndex().setIndices(indexName).get());
}
@ -1326,7 +1311,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
}
}
public void testRestoreDataStreamAliasWithConflictingIndicesAlias() throws Exception {
public void testRestoreDataStreamAliasWithConflictingIndicesAlias() {
var snapshotName = "test-snapshot";
createFullSnapshot(REPO, snapshotName);
client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, "*")).actionGet();
@ -1484,4 +1469,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
}
protected List<GetDataStreamAction.Response.DataStreamInfo> getDataStreamInfo(String... dataStreamNames) {
GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, dataStreamNames);
return safeGet(client.execute(GetDataStreamAction.INSTANCE, getRequest)).getDataStreams();
}
}

View File

@ -49,7 +49,6 @@ import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -794,27 +793,57 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
/**
* Reconciles this data stream with a list of indices available in a snapshot. Allows snapshots to store accurate data
* stream definitions that do not reference backing indices not contained in the snapshot.
* stream definitions that do not reference backing indices and failure indices not contained in the snapshot.
*
* @param indicesInSnapshot List of indices in the snapshot
* @param snapshotMetadataBuilder a metadata builder with the current view of the snapshot metadata
* @return Reconciled {@link DataStream} instance or {@code null} if no reconciled version of this data stream could be built from the
* given indices
*/
@Nullable
public DataStream snapshot(Collection<String> indicesInSnapshot) {
public DataStream snapshot(Set<String> indicesInSnapshot, Metadata.Builder snapshotMetadataBuilder) {
boolean backingIndicesChanged = false;
boolean failureIndicesChanged = false;
// do not include indices not available in the snapshot
List<Index> reconciledIndices = new ArrayList<>(this.backingIndices.indices);
if (reconciledIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false) == false) {
List<Index> reconciledBackingIndices = this.backingIndices.indices;
if (isAnyIndexMissing(this.backingIndices.getIndices(), snapshotMetadataBuilder, indicesInSnapshot)) {
reconciledBackingIndices = new ArrayList<>(this.backingIndices.indices);
backingIndicesChanged = reconciledBackingIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
if (reconciledBackingIndices.isEmpty()) {
return null;
}
}
List<Index> reconciledFailureIndices = this.failureIndices.indices;
if (DataStream.isFailureStoreFeatureFlagEnabled()
&& isAnyIndexMissing(failureIndices.indices, snapshotMetadataBuilder, indicesInSnapshot)) {
reconciledFailureIndices = new ArrayList<>(this.failureIndices.indices);
failureIndicesChanged = reconciledFailureIndices.removeIf(x -> indicesInSnapshot.contains(x.getName()) == false);
}
if (backingIndicesChanged == false && failureIndicesChanged == false) {
return this;
}
if (reconciledIndices.size() == 0) {
return null;
Builder builder = copy();
if (backingIndicesChanged) {
builder.setBackingIndices(backingIndices.copy().setIndices(reconciledBackingIndices).build());
}
if (failureIndicesChanged) {
builder.setFailureIndices(failureIndices.copy().setIndices(reconciledFailureIndices).build());
}
return builder.setMetadata(metadata == null ? null : new HashMap<>(metadata)).build();
}
return copy().setBackingIndices(backingIndices.copy().setIndices(reconciledIndices).build())
.setMetadata(metadata == null ? null : new HashMap<>(metadata))
.build();
private static boolean isAnyIndexMissing(List<Index> indices, Metadata.Builder builder, Set<String> indicesInSnapshot) {
for (Index index : indices) {
final String indexName = index.getName();
if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) {
return true;
}
}
return false;
}
/**

View File

@ -786,15 +786,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
assert snapshot.partial()
: "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial.";
} else {
boolean missingIndex = false;
for (Index index : dataStream.getIndices()) {
final String indexName = index.getName();
if (builder.get(indexName) == null || indicesInSnapshot.contains(indexName) == false) {
missingIndex = true;
break;
}
}
final DataStream reconciled = missingIndex ? dataStream.snapshot(indicesInSnapshot) : dataStream;
final DataStream reconciled = dataStream.snapshot(indicesInSnapshot, builder);
if (reconciled != null) {
dataStreams.put(dataStreamName, reconciled);
}

View File

@ -45,8 +45,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
@ -866,23 +868,39 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
}
public void testSnapshot() {
var preSnapshotDataStream = DataStreamTestHelper.randomInstance();
var indicesToRemove = randomSubsetOf(preSnapshotDataStream.getIndices());
if (indicesToRemove.size() == preSnapshotDataStream.getIndices().size()) {
var preSnapshotDataStream = DataStreamTestHelper.randomInstance(true);
// Mutate backing indices
var backingIndicesToRemove = randomSubsetOf(preSnapshotDataStream.getIndices());
if (backingIndicesToRemove.size() == preSnapshotDataStream.getIndices().size()) {
// never remove them all
indicesToRemove.remove(0);
backingIndicesToRemove.remove(0);
}
var indicesToAdd = randomIndexInstances();
var postSnapshotIndices = new ArrayList<>(preSnapshotDataStream.getIndices());
postSnapshotIndices.removeAll(indicesToRemove);
postSnapshotIndices.addAll(indicesToAdd);
var backingIndicesToAdd = randomIndexInstances();
var postSnapshotBackingIndices = new ArrayList<>(preSnapshotDataStream.getIndices());
postSnapshotBackingIndices.removeAll(backingIndicesToRemove);
postSnapshotBackingIndices.addAll(backingIndicesToAdd);
// Mutate failure indices
var failureIndicesToRemove = randomSubsetOf(preSnapshotDataStream.getFailureIndices().getIndices());
var failureIndicesToAdd = randomIndexInstances();
var postSnapshotFailureIndices = new ArrayList<>(preSnapshotDataStream.getFailureIndices().getIndices());
postSnapshotFailureIndices.removeAll(failureIndicesToRemove);
postSnapshotFailureIndices.addAll(failureIndicesToAdd);
var replicated = preSnapshotDataStream.isReplicated() && randomBoolean();
var postSnapshotDataStream = preSnapshotDataStream.copy()
.setBackingIndices(
preSnapshotDataStream.getBackingIndices()
.copy()
.setIndices(postSnapshotIndices)
.setIndices(postSnapshotBackingIndices)
.setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite())
.build()
)
.setFailureIndices(
preSnapshotDataStream.getFailureIndices()
.copy()
.setIndices(postSnapshotFailureIndices)
.setRolloverOnWrite(replicated == false && preSnapshotDataStream.rolloverOnWrite())
.build()
)
@ -891,9 +909,10 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
.setReplicated(replicated)
.build();
var reconciledDataStream = postSnapshotDataStream.snapshot(
preSnapshotDataStream.getIndices().stream().map(Index::getName).toList()
);
Set<String> indicesInSnapshot = new HashSet<>();
preSnapshotDataStream.getIndices().forEach(index -> indicesInSnapshot.add(index.getName()));
preSnapshotDataStream.getFailureIndices().getIndices().forEach(index -> indicesInSnapshot.add(index.getName()));
var reconciledDataStream = postSnapshotDataStream.snapshot(indicesInSnapshot, Metadata.builder());
assertThat(reconciledDataStream.getName(), equalTo(postSnapshotDataStream.getName()));
assertThat(reconciledDataStream.getGeneration(), equalTo(postSnapshotDataStream.getGeneration()));
@ -907,9 +926,19 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
}
assertThat(reconciledDataStream.isHidden(), equalTo(postSnapshotDataStream.isHidden()));
assertThat(reconciledDataStream.isReplicated(), equalTo(postSnapshotDataStream.isReplicated()));
assertThat(reconciledDataStream.getIndices(), everyItem(not(in(indicesToRemove))));
assertThat(reconciledDataStream.getIndices(), everyItem(not(in(indicesToAdd))));
assertThat(reconciledDataStream.getIndices().size(), equalTo(preSnapshotDataStream.getIndices().size() - indicesToRemove.size()));
assertThat(reconciledDataStream.getIndices(), everyItem(not(in(backingIndicesToRemove))));
assertThat(reconciledDataStream.getIndices(), everyItem(not(in(backingIndicesToAdd))));
assertThat(
reconciledDataStream.getIndices().size(),
equalTo(preSnapshotDataStream.getIndices().size() - backingIndicesToRemove.size())
);
var reconciledFailureIndices = reconciledDataStream.getFailureIndices().getIndices();
assertThat(reconciledFailureIndices, everyItem(not(in(failureIndicesToRemove))));
assertThat(reconciledFailureIndices, everyItem(not(in(failureIndicesToAdd))));
assertThat(
reconciledFailureIndices.size(),
equalTo(preSnapshotDataStream.getFailureIndices().getIndices().size() - failureIndicesToRemove.size())
);
}
public void testSnapshotWithAllBackingIndicesRemoved() {
@ -920,7 +949,12 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
.setBackingIndices(preSnapshotDataStream.getBackingIndices().copy().setIndices(indicesToAdd).build())
.build();
assertNull(postSnapshotDataStream.snapshot(preSnapshotDataStream.getIndices().stream().map(Index::getName).toList()));
assertNull(
postSnapshotDataStream.snapshot(
preSnapshotDataStream.getIndices().stream().map(Index::getName).collect(Collectors.toSet()),
Metadata.builder()
)
);
}
public void testSelectTimeSeriesWriteIndex() {