Allow missing shard stats for restarted nodes for _snapshot/_status (#128399)
Returns an empty shard stats for shard entries where stats were unavailable in the case where a node has been restarted or left the cluster. The change adds a 'description' field to the SnapshotIndexShardStatus class that is used to include a message indicating why the stats are empty. This change was motivated by a desire to reduce latency for getting the stats for currently running snapshots. The stats can still be loaded from the repository via a _snapshot/<repository>/snapshot/_status call. Closes ES-10982 Co-authored-by: Dianna Hohensee <artemisapple@gmail.com>
This commit is contained in:
parent
0cac912ff2
commit
1b49eabc98
|
@ -0,0 +1,5 @@
|
|||
pr: 128399
|
||||
summary: Allow missing shard stats for restarted nodes for `_snapshot/_status`
|
||||
area: Snapshot/Restore
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
|
@ -212,7 +211,8 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
|||
* 1. Start snapshot of two shards (both located on separate data nodes).
|
||||
* 2. Have one of the shards snapshot completely and the other block
|
||||
* 3. Restart the data node that completed its shard snapshot
|
||||
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes
|
||||
* 4. Make sure that snapshot status APIs show correct file-counts and -sizes for non-restarted nodes
|
||||
* 5. Make sure the description string is set for shard snapshots on restarted nodes.
|
||||
*
|
||||
* @throws Exception on failure
|
||||
*/
|
||||
|
@ -248,21 +248,24 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
|||
assertThat(snapshotShardState.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertThat(snapshotShardState.getStats().getTotalFileCount(), greaterThan(0));
|
||||
assertThat(snapshotShardState.getStats().getTotalSize(), greaterThan(0L));
|
||||
assertNull("expected a null description for snapshot shard status: " + snapshotShardState, snapshotShardState.getDescription());
|
||||
}, 30L, TimeUnit.SECONDS);
|
||||
|
||||
final SnapshotStats snapshotShardStats = stateFirstShard(getSnapshotStatus(repoName, snapshotOne), indexTwo).getStats();
|
||||
final int totalFiles = snapshotShardStats.getTotalFileCount();
|
||||
final long totalFileSize = snapshotShardStats.getTotalSize();
|
||||
|
||||
internalCluster().restartNode(dataNodeTwo);
|
||||
|
||||
final SnapshotIndexShardStatus snapshotShardStateAfterNodeRestart = stateFirstShard(
|
||||
getSnapshotStatus(repoName, snapshotOne),
|
||||
indexTwo
|
||||
);
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalFileCount(), equalTo(totalFiles));
|
||||
assertThat(snapshotShardStateAfterNodeRestart.getStats().getTotalSize(), equalTo(totalFileSize));
|
||||
final var snapshotStatusAfterRestart = getSnapshotStatus(repoName, snapshotOne);
|
||||
|
||||
final var snapshotShardStateIndexTwo = stateFirstShard(snapshotStatusAfterRestart, indexTwo);
|
||||
assertThat(snapshotShardStateIndexTwo.getStage(), is(SnapshotIndexShardStage.DONE));
|
||||
assertNotNull("expected a non-null description string for missing stats", snapshotShardStateIndexTwo.getDescription());
|
||||
final var missingStats = snapshotShardStateIndexTwo.getStats();
|
||||
assertThat(missingStats.getTotalFileCount(), equalTo(-1));
|
||||
assertThat(missingStats.getTotalSize(), equalTo(-1L));
|
||||
|
||||
final var snapshotShardStateIndexOne = stateFirstShard(snapshotStatusAfterRestart, indexOne);
|
||||
assertNull("expected a null description string for available stats", snapshotShardStateIndexOne.getDescription());
|
||||
assertThat(snapshotShardStateIndexOne.getStats().getTotalFileCount(), greaterThan(0));
|
||||
assertThat(snapshotShardStateIndexOne.getStats().getTotalSize(), greaterThan(0L));
|
||||
|
||||
unblockAllDataNodes(repoName);
|
||||
assertThat(responseSnapshotOne.get().getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
|
||||
|
|
|
@ -289,6 +289,8 @@ public class TransportVersions {
|
|||
public static final TransportVersion ML_INFERENCE_MISTRAL_CHAT_COMPLETION_ADDED = def(9_090_0_00);
|
||||
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES_ALLOW_LIST = def(9_091_0_00);
|
||||
public static final TransportVersion SEARCH_SOURCE_EXCLUDE_VECTORS_PARAM = def(9_092_0_00);
|
||||
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardResponse;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.xcontent.ToXContentFragment;
|
||||
|
@ -21,6 +22,8 @@ import org.elasticsearch.xcontent.XContentBuilder;
|
|||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
|
||||
|
||||
public class SnapshotIndexShardStatus extends BroadcastShardResponse implements ToXContentFragment {
|
||||
|
||||
private final SnapshotIndexShardStage stage;
|
||||
|
@ -31,12 +34,17 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
|
||||
private String failure;
|
||||
|
||||
private String description;
|
||||
|
||||
public SnapshotIndexShardStatus(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
stage = SnapshotIndexShardStage.fromValue(in.readByte());
|
||||
stats = new SnapshotStats(in);
|
||||
nodeId = in.readOptionalString();
|
||||
failure = in.readOptionalString();
|
||||
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
|
||||
description = in.readOptionalString();
|
||||
}
|
||||
}
|
||||
|
||||
SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage) {
|
||||
|
@ -74,11 +82,38 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
}
|
||||
|
||||
SnapshotIndexShardStatus(ShardId shardId, SnapshotIndexShardStage stage, SnapshotStats stats, String nodeId, String failure) {
|
||||
this(shardId, stage, stats, nodeId, failure, null);
|
||||
}
|
||||
|
||||
SnapshotIndexShardStatus(
|
||||
ShardId shardId,
|
||||
SnapshotIndexShardStage stage,
|
||||
SnapshotStats stats,
|
||||
String nodeId,
|
||||
String failure,
|
||||
@Nullable String description
|
||||
) {
|
||||
super(shardId);
|
||||
this.stage = stage;
|
||||
this.stats = stats;
|
||||
this.nodeId = nodeId;
|
||||
this.failure = failure;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance for scenarios where the snapshot is {@link SnapshotIndexShardStage#DONE} but the stats are unavailable, with a
|
||||
* non-null description of why the stats are missing.
|
||||
*/
|
||||
public static SnapshotIndexShardStatus forDoneButMissingStats(ShardId shardId, String description) {
|
||||
return new SnapshotIndexShardStatus(
|
||||
shardId,
|
||||
SnapshotIndexShardStage.DONE,
|
||||
SnapshotStats.forMissingStats(),
|
||||
null,
|
||||
null,
|
||||
Objects.requireNonNull(description)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -109,6 +144,14 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
return failure;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the optional description of the data values contained in the {@code stats} field.
|
||||
*/
|
||||
@Nullable
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
|
@ -116,12 +159,16 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
stats.writeTo(out);
|
||||
out.writeOptionalString(nodeId);
|
||||
out.writeOptionalString(failure);
|
||||
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
|
||||
out.writeOptionalString(description);
|
||||
}
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String STAGE = "stage";
|
||||
static final String REASON = "reason";
|
||||
static final String NODE = "node";
|
||||
static final String DESCRIPTION = "description";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,6 +182,9 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
if (getFailure() != null) {
|
||||
builder.field(Fields.REASON, getFailure());
|
||||
}
|
||||
if (getDescription() != null) {
|
||||
builder.field(Fields.DESCRIPTION, getDescription());
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -151,7 +201,8 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
return stage == that.stage
|
||||
&& Objects.equals(stats, that.stats)
|
||||
&& Objects.equals(nodeId, that.nodeId)
|
||||
&& Objects.equals(failure, that.failure);
|
||||
&& Objects.equals(failure, that.failure)
|
||||
&& Objects.equals(description, that.description);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -160,6 +211,7 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
|||
result = 31 * result + (stats != null ? stats.hashCode() : 0);
|
||||
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
|
||||
result = 31 * result + (failure != null ? failure.hashCode() : 0);
|
||||
result = 31 * result + (description != null ? description.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,8 @@ import org.elasticsearch.xcontent.XContentBuilder;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS;
|
||||
|
||||
public class SnapshotStats implements Writeable, ToXContentObject {
|
||||
|
||||
private long startTime;
|
||||
|
@ -35,6 +37,19 @@ public class SnapshotStats implements Writeable, ToXContentObject {
|
|||
SnapshotStats() {}
|
||||
|
||||
SnapshotStats(StreamInput in) throws IOException {
|
||||
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
|
||||
if (in.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) && in.readBoolean() == false) {
|
||||
startTime = 0L;
|
||||
time = 0L;
|
||||
incrementalFileCount = -1;
|
||||
processedFileCount = -1;
|
||||
incrementalSize = -1L;
|
||||
processedSize = -1L;
|
||||
totalFileCount = -1;
|
||||
totalSize = -1L;
|
||||
return;
|
||||
}
|
||||
|
||||
startTime = in.readVLong();
|
||||
time = in.readVLong();
|
||||
|
||||
|
@ -69,6 +84,20 @@ public class SnapshotStats implements Writeable, ToXContentObject {
|
|||
this.processedSize = processedSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a stats instance with invalid field values for use in situations where the snapshot stats are unavailable.
|
||||
*/
|
||||
public static SnapshotStats forMissingStats() {
|
||||
return new SnapshotStats(0L, 0L, -1, -1, -1, -1L, -1L, -1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if this instance is for a shard snapshot with unavailable stats.
|
||||
*/
|
||||
public boolean isMissingStats() {
|
||||
return incrementalFileCount == -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns time when snapshot started
|
||||
*/
|
||||
|
@ -127,6 +156,23 @@ public class SnapshotStats implements Writeable, ToXContentObject {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
if (out.getTransportVersion().onOrAfter(SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
|
||||
// We use a boolean to indicate if the stats are present (true) or missing (false), to skip writing all the values if missing.
|
||||
if (isMissingStats()) {
|
||||
out.writeBoolean(false);
|
||||
return;
|
||||
}
|
||||
out.writeBoolean(true);
|
||||
} else if (isMissingStats()) {
|
||||
throw new IllegalStateException(
|
||||
"cannot serialize empty stats for transport version ["
|
||||
+ out.getTransportVersion()
|
||||
+ "] less than ["
|
||||
+ SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS
|
||||
+ "]"
|
||||
);
|
||||
}
|
||||
|
||||
out.writeVLong(startTime);
|
||||
out.writeVLong(time);
|
||||
|
||||
|
@ -196,6 +242,10 @@ public class SnapshotStats implements Writeable, ToXContentObject {
|
|||
* @param updateTimestamps Whether or not start time and duration should be updated
|
||||
*/
|
||||
void add(SnapshotStats stats, boolean updateTimestamps) {
|
||||
if (stats.isMissingStats()) {
|
||||
return;
|
||||
}
|
||||
|
||||
incrementalFileCount += stats.incrementalFileCount;
|
||||
totalFileCount += stats.totalFileCount;
|
||||
processedFileCount += stats.processedFileCount;
|
||||
|
|
|
@ -11,6 +11,8 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -119,7 +121,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
Arrays.asList(request.snapshots())
|
||||
);
|
||||
if (currentSnapshots.isEmpty()) {
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -152,6 +154,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
request,
|
||||
currentSnapshots,
|
||||
nodeSnapshotStatuses,
|
||||
state.getMinTransportVersion(),
|
||||
cancellableTask,
|
||||
l
|
||||
)
|
||||
|
@ -160,7 +163,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
);
|
||||
} else {
|
||||
// We don't have any in-progress shards, just return current stats
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, cancellableTask, listener);
|
||||
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -171,6 +174,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
SnapshotsStatusRequest request,
|
||||
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
|
||||
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
|
||||
TransportVersion minTransportVersion,
|
||||
CancellableTask task,
|
||||
ActionListener<SnapshotsStatusResponse> listener
|
||||
) {
|
||||
|
@ -196,13 +200,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
}
|
||||
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
|
||||
if (status.nodeId() != null) {
|
||||
// We should have information about this shard from the shard:
|
||||
// We should have information about this shard from the node:
|
||||
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
|
||||
if (nodeStatus != null) {
|
||||
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshot());
|
||||
if (shardStatues != null) {
|
||||
Map<ShardId, SnapshotIndexShardStatus> shardStatuses = nodeStatus.status().get(entry.snapshot());
|
||||
if (shardStatuses != null) {
|
||||
final ShardId sid = entry.shardId(shardEntry.getKey());
|
||||
SnapshotIndexShardStatus shardStatus = shardStatues.get(sid);
|
||||
SnapshotIndexShardStatus shardStatus = shardStatuses.get(sid);
|
||||
if (shardStatus != null) {
|
||||
// We have full information about this shard
|
||||
if (shardStatus.getStage() == SnapshotIndexShardStage.DONE
|
||||
|
@ -228,8 +232,6 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
}
|
||||
// We failed to find the status of the shard from the responses we received from data nodes.
|
||||
// This can happen if nodes drop out of the cluster completely or restart during the snapshot.
|
||||
// We rebuild the information they would have provided from their in memory state from the cluster
|
||||
// state and the repository contents in the below logic
|
||||
final SnapshotIndexShardStage stage = switch (shardEntry.getValue().state()) {
|
||||
case FAILED, ABORTED, MISSING -> SnapshotIndexShardStage.FAILURE;
|
||||
case INIT, WAITING, PAUSED_FOR_NODE_REMOVAL, QUEUED -> SnapshotIndexShardStage.STARTED;
|
||||
|
@ -237,18 +239,27 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
};
|
||||
final SnapshotIndexShardStatus shardStatus;
|
||||
if (stage == SnapshotIndexShardStage.DONE) {
|
||||
// Shard snapshot completed successfully so we should be able to load the exact statistics for this
|
||||
// shard from the repository already.
|
||||
final ShardId shardId = entry.shardId(shardEntry.getKey());
|
||||
shardStatus = new SnapshotIndexShardStatus(
|
||||
shardId,
|
||||
repositoriesService.repository(entry.repository())
|
||||
.getShardSnapshotStatus(
|
||||
entry.snapshot().getSnapshotId(),
|
||||
entry.indices().get(shardId.getIndexName()),
|
||||
shardId
|
||||
)
|
||||
);
|
||||
// When processing currently running snapshots, instead of reading the statistics from the repository, which can be
|
||||
// expensive, we choose instead to provide a message to the caller explaining why the stats are missing and the API
|
||||
// that can be used to load them once the snapshot has completed.
|
||||
if (minTransportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS)) {
|
||||
shardStatus = SnapshotIndexShardStatus.forDoneButMissingStats(shardId, """
|
||||
Snapshot shard stats missing from a currently running snapshot due to a node leaving the cluster after \
|
||||
completing the shard snapshot; retry once the snapshot has completed to load all shard stats from the \
|
||||
repository.""");
|
||||
} else {
|
||||
// BWC behavior, load the stats directly from the repository.
|
||||
shardStatus = new SnapshotIndexShardStatus(
|
||||
shardId,
|
||||
repositoriesService.repository(entry.repository())
|
||||
.getShardSnapshotStatus(
|
||||
entry.snapshot().getSnapshotId(),
|
||||
entry.indices().get(shardId.getIndexName()),
|
||||
shardId
|
||||
)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.getKey()), stage);
|
||||
}
|
||||
|
|
|
@ -39,10 +39,13 @@ public class SnapshotIndexShardStatusTests extends AbstractXContentTestCase<Snap
|
|||
SnapshotStats stats = new SnapshotStatsTests().createTestInstance();
|
||||
String nodeId = randomAlphaOfLength(20);
|
||||
String failure = null;
|
||||
String description = null;
|
||||
if (rarely()) {
|
||||
failure = randomAlphaOfLength(200);
|
||||
} else if (rarely()) {
|
||||
description = randomAlphaOfLength(200);
|
||||
}
|
||||
return new SnapshotIndexShardStatus(shardId, stage, stats, nodeId, failure);
|
||||
return new SnapshotIndexShardStatus(shardId, stage, stats, nodeId, failure, description);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -76,6 +79,7 @@ public class SnapshotIndexShardStatusTests extends AbstractXContentTestCase<Snap
|
|||
String rawStage = (String) parsedObjects[i++];
|
||||
String nodeId = (String) parsedObjects[i++];
|
||||
String failure = (String) parsedObjects[i++];
|
||||
String description = (String) parsedObjects[i++];
|
||||
SnapshotStats stats = (SnapshotStats) parsedObjects[i];
|
||||
|
||||
SnapshotIndexShardStage stage;
|
||||
|
@ -89,12 +93,13 @@ public class SnapshotIndexShardStatusTests extends AbstractXContentTestCase<Snap
|
|||
rawStage
|
||||
);
|
||||
}
|
||||
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure);
|
||||
return new SnapshotIndexShardStatus(shard, stage, stats, nodeId, failure, description);
|
||||
}
|
||||
);
|
||||
innerParser.declareString(constructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.STAGE));
|
||||
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.NODE));
|
||||
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.REASON));
|
||||
innerParser.declareString(optionalConstructorArg(), new ParseField(SnapshotIndexShardStatus.Fields.DESCRIPTION));
|
||||
innerParser.declareObject(
|
||||
constructorArg(),
|
||||
(p, c) -> SnapshotStatsTests.fromXContent(p),
|
||||
|
@ -122,4 +127,8 @@ public class SnapshotIndexShardStatusTests extends AbstractXContentTestCase<Snap
|
|||
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
|
||||
return PARSER.parse(parser, indexId, parser.currentName());
|
||||
}
|
||||
|
||||
public void testForDoneButMissingStatsXContentSerialization() throws IOException {
|
||||
testFromXContent(() -> SnapshotIndexShardStatus.forDoneButMissingStats(createTestInstance().getShardId(), randomAlphaOfLength(16)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,11 +9,17 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.snapshots.status;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentParserUtils;
|
||||
import org.elasticsearch.test.AbstractXContentTestCase;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class SnapshotStatsTests extends AbstractXContentTestCase<SnapshotStats> {
|
||||
|
||||
|
@ -58,6 +64,51 @@ public class SnapshotStatsTests extends AbstractXContentTestCase<SnapshotStats>
|
|||
return true;
|
||||
}
|
||||
|
||||
public void testMissingStats() throws IOException {
|
||||
final var populatedStats = createTestInstance();
|
||||
final var missingStats = SnapshotStats.forMissingStats();
|
||||
assertEquals(0L, missingStats.getStartTime());
|
||||
assertEquals(0L, missingStats.getTime());
|
||||
assertEquals(-1, missingStats.getTotalFileCount());
|
||||
assertEquals(-1, missingStats.getIncrementalFileCount());
|
||||
assertEquals(-1, missingStats.getProcessedFileCount());
|
||||
assertEquals(-1L, missingStats.getTotalSize());
|
||||
assertEquals(-1L, missingStats.getIncrementalSize());
|
||||
assertEquals(-1L, missingStats.getProcessedSize());
|
||||
|
||||
// Verify round trip Transport serialization.
|
||||
for (var transportVersion : List.of(
|
||||
TransportVersions.MINIMUM_COMPATIBLE,
|
||||
TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS,
|
||||
TransportVersion.current()
|
||||
)) {
|
||||
|
||||
for (var stats : List.of(populatedStats, missingStats)) {
|
||||
final var bytesOut = new ByteArrayOutputStream();
|
||||
|
||||
try (var streamOut = new OutputStreamStreamOutput(bytesOut)) {
|
||||
streamOut.setTransportVersion(transportVersion);
|
||||
|
||||
if (transportVersion.onOrAfter(TransportVersions.SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS) || stats != missingStats) {
|
||||
stats.writeTo(streamOut);
|
||||
} else {
|
||||
assertThrows(IllegalStateException.class, () -> stats.writeTo(streamOut));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
try (var streamIn = new ByteArrayStreamInput(bytesOut.toByteArray())) {
|
||||
streamIn.setTransportVersion(transportVersion);
|
||||
final var statsRead = new SnapshotStats(streamIn);
|
||||
assertEquals(stats, statsRead);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify round trip XContent serialization.
|
||||
testFromXContent(SnapshotStats::forMissingStats);
|
||||
}
|
||||
|
||||
static SnapshotStats fromXContent(XContentParser parser) throws IOException {
|
||||
// Parse this old school style instead of using the ObjectParser since there's an impedance mismatch between how the
|
||||
// object has historically been written as JSON versus how it is structured in Java.
|
||||
|
|
|
@ -9,17 +9,22 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.snapshots.status;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.client.internal.node.NodeClient;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.ShardGeneration;
|
||||
import org.elasticsearch.repositories.ShardSnapshotResult;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
|
@ -195,9 +200,169 @@ public class TransportSnapshotsStatusActionTests extends ESTestCase {
|
|||
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
|
||||
currentSnapshotEntries,
|
||||
nodeSnapshotStatuses,
|
||||
TransportVersion.current(),
|
||||
cancellableTask,
|
||||
ActionListener.runAfter(listener, () -> listenerInvoked.set(true))
|
||||
);
|
||||
assertTrue("Expected listener to be invoked", listenerInvoked.get());
|
||||
}
|
||||
|
||||
public void testShardSnapshotMissingDataFromNodeWhenNodeHasBeenRestarted() {
|
||||
final var snapshot = new Snapshot(ProjectId.DEFAULT, "test-repo", new SnapshotId("snapshot", "uuid"));
|
||||
final var indexName = "test-index-name";
|
||||
final var indexUuid = "test-index-uuid";
|
||||
final var shardGeneration = new ShardGeneration("gen");
|
||||
final var shardId2 = new ShardId(indexName, indexUuid, 2);
|
||||
final var nowMsecs = System.currentTimeMillis();
|
||||
final var eightKb = ByteSizeValue.ofKb(8).getBytes();
|
||||
final var shard2SnapshotStats = new SnapshotStats(nowMsecs, 0, 1, 1, 1, eightKb, eightKb, eightKb);
|
||||
|
||||
final var currentSnapshotEntries = List.of(
|
||||
SnapshotsInProgress.Entry.snapshot(
|
||||
snapshot,
|
||||
randomBoolean(),
|
||||
randomBoolean(),
|
||||
SnapshotsInProgress.State.STARTED,
|
||||
Map.of(indexName, new IndexId(indexName, indexUuid)),
|
||||
List.of(),
|
||||
List.of(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong(),
|
||||
Map.of(
|
||||
new ShardId(indexName, indexUuid, 0),
|
||||
SnapshotsInProgress.ShardSnapshotStatus.success(
|
||||
"nodeId0",
|
||||
new ShardSnapshotResult(shardGeneration, ByteSizeValue.ofKb(5), 1)
|
||||
),
|
||||
new ShardId(indexName, indexUuid, 1),
|
||||
new SnapshotsInProgress.ShardSnapshotStatus("nodeId1", shardGeneration),
|
||||
shardId2,
|
||||
SnapshotsInProgress.ShardSnapshotStatus.success(
|
||||
"nodeId2",
|
||||
new ShardSnapshotResult(shardGeneration, ByteSizeValue.ofKb(8), 1)
|
||||
)
|
||||
),
|
||||
null,
|
||||
Map.of(),
|
||||
IndexVersion.current()
|
||||
)
|
||||
);
|
||||
final var nodeSnapshotStatuses = new TransportNodesSnapshotsStatus.NodesSnapshotStatus(
|
||||
clusterService.getClusterName(),
|
||||
List.of(
|
||||
new TransportNodesSnapshotsStatus.NodeSnapshotStatus(
|
||||
new DiscoveryNode(
|
||||
"nodeName0",
|
||||
"nodeId0",
|
||||
new TransportAddress(TransportAddress.META_ADDRESS, 9000),
|
||||
Map.of(),
|
||||
Set.of(),
|
||||
null
|
||||
),
|
||||
// Here we are missing the snapshot data for the shard on this node.
|
||||
Map.of()
|
||||
),
|
||||
new TransportNodesSnapshotsStatus.NodeSnapshotStatus(
|
||||
new DiscoveryNode(
|
||||
"nodeName2",
|
||||
"nodeId2",
|
||||
new TransportAddress(TransportAddress.META_ADDRESS, 9002),
|
||||
Map.of(),
|
||||
Set.of(),
|
||||
null
|
||||
),
|
||||
Map.of(
|
||||
snapshot,
|
||||
Map.of(
|
||||
shardId2,
|
||||
new SnapshotIndexShardStatus(
|
||||
new ShardId(indexName, indexUuid, 2),
|
||||
SnapshotIndexShardStage.DONE,
|
||||
shard2SnapshotStats,
|
||||
"nodeId2",
|
||||
null
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
List.of()
|
||||
);
|
||||
|
||||
final Consumer<SnapshotsStatusResponse> verifyResponse = rsp -> {
|
||||
assertNotNull(rsp);
|
||||
final var snapshotStatuses = rsp.getSnapshots();
|
||||
assertNotNull(snapshotStatuses);
|
||||
assertEquals(
|
||||
"expected 1 snapshot status, got " + snapshotStatuses.size() + ": " + snapshotStatuses,
|
||||
1,
|
||||
snapshotStatuses.size()
|
||||
);
|
||||
final var snapshotStatus = snapshotStatuses.getFirst();
|
||||
assertEquals(SnapshotsInProgress.State.STARTED, snapshotStatus.getState());
|
||||
final var shardStats = snapshotStatus.getShardsStats();
|
||||
assertNotNull("expected non-null shard stats for SnapshotStatus: " + snapshotStatus, shardStats);
|
||||
assertEquals(new SnapshotShardsStats(0, 1 /* started */, 0, 2 /* done */, 0, 3 /* total */), shardStats);
|
||||
final var totalStats = snapshotStatus.getStats();
|
||||
assertNotNull("expected non-null total stats for SnapshotStatus: " + snapshotStatus, totalStats);
|
||||
assertEquals("expected total file count to be 1 in the stats: " + totalStats, 1, totalStats.getTotalFileCount());
|
||||
assertEquals("expected total size to be " + eightKb + " in the stats: " + totalStats, eightKb, totalStats.getTotalSize());
|
||||
final var snapshotStatusIndices = snapshotStatus.getIndices();
|
||||
assertNotNull("expected a non-null map from getIndices() from SnapshotStatus: " + snapshotStatus, snapshotStatusIndices);
|
||||
final var snapshotIndexStatus = snapshotStatusIndices.get(indexName);
|
||||
assertNotNull(
|
||||
"no entry for indexName [" + indexName + "] found in snapshotStatusIndices: " + snapshotStatusIndices,
|
||||
snapshotIndexStatus
|
||||
);
|
||||
final var shardMap = snapshotIndexStatus.getShards();
|
||||
assertNotNull("expected a non-null shard map for SnapshotIndexStatus: " + snapshotIndexStatus, shardMap);
|
||||
|
||||
// Verify data for the shard 0 entry, which is missing the stats.
|
||||
final var shard0Entry = shardMap.get(0);
|
||||
assertNotNull("no entry for shard 0 found in indexName [" + indexName + "] shardMap: " + shardMap, shard0Entry);
|
||||
assertEquals(SnapshotIndexShardStage.DONE, shard0Entry.getStage());
|
||||
final var description = shard0Entry.getDescription();
|
||||
assertNotNull("expected a non-null description string for shard 0 with missing stats from node0: " + shard0Entry, description);
|
||||
assertTrue(
|
||||
"unexpected description string text: " + description,
|
||||
description.contains("shard stats missing from a currently running snapshot due to")
|
||||
);
|
||||
assertEquals(SnapshotStats.forMissingStats(), shard0Entry.getStats());
|
||||
|
||||
// Verify data for the shard 2 entry, which is DONE and has stats present.
|
||||
final var shard2Entry = shardMap.get(2);
|
||||
assertNotNull("no entry for shard 2 found in indexName [" + indexName + "] shardMap: " + shardMap, shard2Entry);
|
||||
assertEquals(SnapshotIndexShardStage.DONE, shard2Entry.getStage());
|
||||
assertNull(
|
||||
"expected a null description string for shard 2 that has stats data present: " + shard2Entry,
|
||||
shard2Entry.getDescription()
|
||||
);
|
||||
assertEquals("unexpected stats for shard 2: " + shard2Entry, shard2SnapshotStats, shard2Entry.getStats());
|
||||
};
|
||||
|
||||
final var listener = new ActionListener<SnapshotsStatusResponse>() {
|
||||
@Override
|
||||
public void onResponse(SnapshotsStatusResponse rsp) {
|
||||
verifyResponse.accept(rsp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
fail("expected onResponse() instead of onFailure(" + e + ")");
|
||||
}
|
||||
};
|
||||
|
||||
final var listenerInvoked = new AtomicBoolean(false);
|
||||
|
||||
action.buildResponse(
|
||||
SnapshotsInProgress.EMPTY,
|
||||
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
|
||||
currentSnapshotEntries,
|
||||
nodeSnapshotStatuses,
|
||||
TransportVersion.current(),
|
||||
new CancellableTask(randomLong(), "type", "action", "desc", null, Map.of()),
|
||||
ActionListener.runAfter(listener, () -> listenerInvoked.set(true))
|
||||
);
|
||||
assertTrue("Expected listener to be invoked", listenerInvoked.get());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -284,9 +284,16 @@ public abstract class AbstractXContentTestCase<T extends ToXContent> extends EST
|
|||
* both for equality and asserts equality on the two queries.
|
||||
*/
|
||||
public final void testFromXContent() throws IOException {
|
||||
testFromXContent(this::createTestInstance);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic test that creates a new instance using the given supplier and verifies XContent round trip serialization.
|
||||
*/
|
||||
public final void testFromXContent(Supplier<T> testInstanceSupplier) throws IOException {
|
||||
testFromXContent(
|
||||
NUMBER_OF_TEST_RUNS,
|
||||
this::createTestInstance,
|
||||
testInstanceSupplier,
|
||||
supportsUnknownFields(),
|
||||
getShuffleFieldsExceptions(),
|
||||
getRandomFieldsExcludeFilter(),
|
||||
|
|
Loading…
Reference in New Issue