Use Azure blob batch API to delete blobs in batches (#114566)

Closes ES-9777
This commit is contained in:
Nick Tindall 2024-10-24 19:51:52 +11:00 committed by GitHub
parent cc6e7415c1
commit 7599d4cf43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 374 additions and 88 deletions

View File

@ -0,0 +1,5 @@
pr: 114566
summary: Use Azure blob batch API to delete blobs in batches
area: Distributed
type: enhancement
issues: []

View File

@ -259,6 +259,15 @@ include::repository-shared-settings.asciidoc[]
`primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
to `secondary_only`, it will force `readonly` to true.
`delete_objects_max_size`::
(integer) Sets the maxmimum batch size, betewen 1 and 256, used for `BlobBatch` requests. Defaults to 256 which is the maximum
number supported by the https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks[Azure blob batch API].
`max_concurrent_batch_deletes`::
(integer) Sets the maximum number of concurrent batch delete requests that will be submitted for any individual bulk delete with `BlobBatch`. Note that the effective number of concurrent deletes is further limited by the Azure client connection and event loop thread limits. Defaults to 10, minimum is 1, maximum is 100.
[[repository-azure-validation]]
==== Repository validation rules

View File

@ -144,6 +144,11 @@
<sha256 value="31915426834400cac854f48441c168d55aa6fc054527f28f1d242a7067affd14" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-blob-batch" version="12.23.1">
<artifact name="azure-storage-blob-batch-12.23.1.jar">
<sha256 value="8c11749c783222873f63f22575aa5ae7ee8f285388183b82d1a18db21f4d2eba" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-common" version="12.26.1">
<artifact name="azure-storage-common-12.26.1.jar">
<sha256 value="b0297ac1a9017ccd8a1e5cf41fb8d00ff0adbdd06849f6c5aafb3208708264dd" origin="Generated by Gradle"/>

View File

@ -30,6 +30,7 @@ dependencies {
api "com.azure:azure-identity:1.13.2"
api "com.azure:azure-json:1.2.0"
api "com.azure:azure-storage-blob:12.27.1"
api "com.azure:azure-storage-blob-batch:12.23.1"
api "com.azure:azure-storage-common:12.26.1"
api "com.azure:azure-storage-internal-avro:12.12.1"
api "com.azure:azure-xml:1.1.0"

View File

@ -9,14 +9,18 @@
package org.elasticsearch.repositories.azure;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
@ -31,6 +35,7 @@ import org.junit.After;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -43,6 +48,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -225,6 +231,91 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito
assertThat(recordedRequestTime, lessThanOrEqualTo(elapsedTimeMillis));
}
public void testBatchDeleteFailure() throws IOException {
final int deleteBatchSize = randomIntBetween(1, 30);
final String repositoryName = randomRepositoryName();
final String repository = createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
.build(),
true
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer container = getBlobContainer(dataNodeName, repository);
final List<String> blobsToDelete = new ArrayList<>();
final int numberOfBatches = randomIntBetween(3, 20);
final int numberOfBlobs = numberOfBatches * deleteBatchSize;
final int failedBatches = randomIntBetween(1, numberOfBatches);
for (int i = 0; i < numberOfBlobs; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = "index-" + randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
blobsToDelete.add(blobName);
}
Randomness.shuffle(blobsToDelete);
clearMetrics(dataNodeName);
// Handler will fail one or more of the batch requests
final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
// Exhaust the retries
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
.forEach(i -> requestHandlers.offer(failNRequestRequestHandler));
logger.info("--> Failing {} of {} batches", failedBatches, numberOfBatches);
final IOException exception = assertThrows(
IOException.class,
() -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator())
);
assertEquals(Math.min(failedBatches, 10), exception.getSuppressed().length);
assertEquals(
(numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1L)),
getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_REQUESTS_TOTAL)
);
assertEquals((failedBatches * (MAX_RETRIES + 1L)), getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_EXCEPTIONS_TOTAL));
assertEquals(failedBatches * deleteBatchSize, container.listBlobs(randomPurpose()).size());
}
private long getLongCounterTotal(String dataNodeName, String metricKey) {
return getTelemetryPlugin(dataNodeName).getLongCounterMeasurement(metricKey)
.stream()
.mapToLong(Measurement::getLong)
.reduce(0L, Long::sum);
}
/**
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
* it sees. Any other requests are passed through to the delegate.
*
* @param numberToFail The number of requests to fail
* @return the handler
*/
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
final List<String> requestsToFail = new ArrayList<>(numberToFail);
return (exchange, delegate) -> {
final Headers requestHeaders = exchange.getRequestHeaders();
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
boolean failRequest = false;
synchronized (requestsToFail) {
if (requestsToFail.contains(requestId)) {
failRequest = true;
} else if (requestsToFail.size() < numberToFail) {
requestsToFail.add(requestId);
failRequest = true;
}
}
if (failRequest) {
exchange.sendResponseHeaders(500, -1);
} else {
delegate.handle(exchange);
}
};
}
private void clearMetrics(String discoveryNode) {
internalCluster().getInstance(PluginsService.class, discoveryNode)
.filterPlugins(TestTelemetryPlugin.class)

View File

@ -89,7 +89,9 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
.put(super.repositorySettings(repoName))
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), randomIntBetween(5, 256))
.put(AzureRepository.Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.getKey(), randomIntBetween(1, 10));
if (randomBoolean()) {
settingsBuilder.put(AzureRepository.Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
}
@ -249,6 +251,8 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
trackRequest("PutBlockList");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PutBlob");
} else if (Regex.simpleMatch("POST /*/*?*comp=batch*", request)) {
trackRequest("BlobBatch");
}
}
@ -279,10 +283,22 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
}
public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
try (BlobStore store = newBlobStore()) {
// Test with a smaller batch size here
final int deleteBatchSize = randomIntBetween(1, 30);
final String repositoryName = randomRepositoryName();
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
.build(),
true
);
try (BlobStore store = newBlobStore(repositoryName)) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int toDeleteCount = randomIntBetween(deleteBatchSize, 3 * deleteBatchSize);
final List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < toDeleteCount; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);

View File

@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -46,6 +48,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
private static final Logger logger = LogManager.getLogger(AzureStorageCleanupThirdPartyTests.class);
private static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("test.azure.fixture", "true"));
private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
@ -89,8 +92,10 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.default.account", System.getProperty("test.azure.account"));
if (hasSasToken) {
logger.info("--> Using SAS token authentication");
secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token"));
} else {
logger.info("--> Using key authentication");
secureSettings.setString("azure.client.default.key", System.getProperty("test.azure.key"));
}
return secureSettings;

View File

@ -18,10 +18,7 @@ module org.elasticsearch.repository.azure {
requires org.apache.logging.log4j;
requires org.apache.logging.log4j.core;
requires com.azure.core;
requires com.azure.http.netty;
requires com.azure.storage.blob;
requires com.azure.storage.common;
requires com.azure.identity;
requires io.netty.buffer;
@ -29,7 +26,7 @@ module org.elasticsearch.repository.azure {
requires io.netty.resolver;
requires io.netty.common;
requires reactor.core;
requires reactor.netty.core;
requires reactor.netty.http;
requires com.azure.storage.blob.batch;
}

View File

@ -138,7 +138,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
}
@Override
public DeleteResult delete(OperationPurpose purpose) {
public DeleteResult delete(OperationPurpose purpose) throws IOException {
return blobStore.deleteBlobDirectory(purpose, keyPath);
}

View File

@ -25,6 +25,10 @@ import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchAsyncClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
@ -99,6 +103,8 @@ import static org.elasticsearch.core.Strings.format;
public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body
public static final int MAX_ELEMENTS_PER_BATCH = 256;
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes();
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();
@ -110,6 +116,8 @@ public class AzureBlobStore implements BlobStore {
private final String container;
private final LocationMode locationMode;
private final ByteSizeValue maxSinglePartUploadSize;
private final int deletionBatchSize;
private final int maxConcurrentBatchDeletes;
private final RequestMetricsRecorder requestMetricsRecorder;
private final AzureClientProvider.RequestMetricsHandler requestMetricsHandler;
@ -129,6 +137,8 @@ public class AzureBlobStore implements BlobStore {
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
this.deletionBatchSize = Repository.DELETION_BATCH_SIZE_SETTING.get(metadata.settings());
this.maxConcurrentBatchDeletes = Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.get(metadata.settings());
List<RequestMatcher> requestMatchers = List.of(
new RequestMatcher((httpMethod, url) -> httpMethod == HttpMethod.HEAD, Operation.GET_BLOB_PROPERTIES),
@ -147,17 +157,14 @@ public class AzureBlobStore implements BlobStore {
&& isPutBlockRequest(httpMethod, url) == false
&& isPutBlockListRequest(httpMethod, url) == false,
Operation.PUT_BLOB
)
),
new RequestMatcher(AzureBlobStore::isBlobBatch, Operation.BLOB_BATCH)
);
this.requestMetricsHandler = (purpose, method, url, metrics) -> {
try {
URI uri = url.toURI();
String path = uri.getPath() == null ? "" : uri.getPath();
// Batch delete requests
if (path.contains(container) == false) {
return;
}
assert path.contains(container) : uri.toString();
} catch (URISyntaxException ignored) {
return;
@ -172,6 +179,10 @@ public class AzureBlobStore implements BlobStore {
};
}
private static boolean isBlobBatch(HttpMethod method, URL url) {
return method == HttpMethod.POST && url.getQuery() != null && url.getQuery().contains("comp=batch");
}
private static boolean isListRequest(HttpMethod httpMethod, URL url) {
return httpMethod == HttpMethod.GET && url.getQuery() != null && url.getQuery().contains("comp=list");
}
@ -231,95 +242,101 @@ public class AzureBlobStore implements BlobStore {
}
}
// number of concurrent blob delete requests to use while bulk deleting
private static final int CONCURRENT_DELETES = 100;
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) {
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) throws IOException {
final AtomicInteger blobsDeleted = new AtomicInteger(0);
final AtomicLong bytesDeleted = new AtomicLong(0);
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container);
final AzureBlobServiceClient client = getAzureBlobServiceClientClient(purpose);
final BlobContainerAsyncClient blobContainerAsyncClient = client.getAsyncClient().getBlobContainerAsyncClient(container);
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
try {
blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> {
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
return Mono.empty();
} else {
final String blobName = blobItem.getName();
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
final Mono<Void> deleteTask = getDeleteTask(blobName, blobAsyncClient);
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
blobsDeleted.incrementAndGet();
return deleteTask;
}
}, CONCURRENT_DELETES).then().block();
} catch (Exception e) {
filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
}
final Flux<String> blobsFlux = blobContainerAsyncClient.listBlobs(options).filter(bi -> bi.isPrefix() == false).map(bi -> {
bytesDeleted.addAndGet(bi.getProperties().getContentLength());
blobsDeleted.incrementAndGet();
return bi.getName();
});
deleteListOfBlobs(client, blobsFlux);
});
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
private static void filterDeleteExceptionsAndRethrow(Exception e, IOException exception) throws IOException {
int suppressedCount = 0;
for (Throwable suppressed : e.getSuppressed()) {
// We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
if (suppressed instanceof IOException) {
exception.addSuppressed(suppressed);
suppressedCount++;
if (suppressedCount > 10) {
break;
}
}
@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
if (blobNames.hasNext() == false) {
return;
}
SocketAccess.doPrivilegedVoidException(
() -> deleteListOfBlobs(
getAzureBlobServiceClientClient(purpose),
Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobNames, Spliterator.ORDERED), false))
)
);
}
private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Flux<String> blobNames) throws IOException {
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter
// is sent, and we can support all SAS token types
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder(
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
).buildAsyncClient();
final List<Throwable> errors;
final AtomicInteger errorsCollected = new AtomicInteger(0);
try {
errors = blobNames.buffer(deletionBatchSize).flatMap(blobs -> {
final BlobBatch blobBatch = batchAsyncClient.getBlobBatch();
blobs.forEach(blob -> blobBatch.deleteBlob(container, blob));
return batchAsyncClient.submitBatch(blobBatch).then(Mono.<Throwable>empty()).onErrorResume(t -> {
// Ignore errors that are just 404s, send other errors downstream as values
if (AzureBlobStore.isIgnorableBatchDeleteException(t)) {
return Mono.empty();
} else {
// Propagate the first 10 errors only
if (errorsCollected.getAndIncrement() < 10) {
return Mono.just(t);
} else {
return Mono.empty();
}
}
});
}, maxConcurrentBatchDeletes).collectList().block();
} catch (Exception e) {
throw new IOException("Error deleting batches", e);
}
if (errors.isEmpty() == false) {
final int totalErrorCount = errorsCollected.get();
final String errorMessage = totalErrorCount > errors.size()
? "Some errors occurred deleting batches, the first "
+ errors.size()
+ " are included as suppressed, but the total count was "
+ totalErrorCount
: "Some errors occurred deleting batches, all errors included as suppressed";
final IOException ex = new IOException(errorMessage);
errors.forEach(ex::addSuppressed);
throw ex;
}
throw exception;
}
/**
* {@inheritDoc}
* <p>
* Note that in this Azure implementation we issue a series of individual
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob">delete blob</a> calls rather than aggregating
* deletions into <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch">blob batch</a> calls.
* The reason for this is that the blob batch endpoint has limited support for SAS token authentication.
* We can ignore {@link BlobBatchStorageException}s when they are just telling us some of the files were not found
*
* @see <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization">
* API docs around SAS auth limitations</a>
* @see <a href="https://github.com/Azure/azure-storage-java/issues/538">Java SDK issue</a>
* @see <a href="https://github.com/elastic/elasticsearch/pull/65140#discussion_r528752070">Discussion on implementing PR</a>
* @param exception An exception throw by batch delete
* @return true if it is safe to ignore, false otherwise
*/
@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobs) {
if (blobs.hasNext() == false) {
return;
}
BlobServiceAsyncClient asyncClient = asyncClient(purpose);
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
try {
Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobs, Spliterator.ORDERED), false))
.flatMap(blob -> getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES)
.then()
.block();
} catch (Exception e) {
filterDeleteExceptionsAndRethrow(e, new IOException("Unable to delete blobs"));
private static boolean isIgnorableBatchDeleteException(Throwable exception) {
if (exception instanceof BlobBatchStorageException bbse) {
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions();
for (BlobStorageException bse : batchExceptions) {
// If any requests failed with something other than a BLOB_NOT_FOUND, it is not ignorable
if (BlobErrorCode.BLOB_NOT_FOUND.equals(bse.getErrorCode()) == false) {
return false;
}
}
});
}
private static Mono<Void> getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) {
return blobAsyncClient.delete()
// Ignore not found blobs, as it's possible that due to network errors a request
// for an already deleted blob is retried, causing an error.
.onErrorResume(
e -> e instanceof BlobStorageException blobStorageException && blobStorageException.getStatusCode() == 404,
throwable -> Mono.empty()
)
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable));
return true;
}
return false;
}
public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) {
@ -363,8 +380,7 @@ public class AzureBlobStore implements BlobStore {
for (final BlobItem blobItem : containerClient.listBlobsByHierarchy("/", listBlobsOptions, null)) {
BlobItemProperties properties = blobItem.getProperties();
Boolean isPrefix = blobItem.isPrefix();
if (isPrefix != null && isPrefix) {
if (blobItem.isPrefix()) {
continue;
}
String blobName = blobItem.getName().substring(keyPath.length());
@ -689,7 +705,8 @@ public class AzureBlobStore implements BlobStore {
GET_BLOB_PROPERTIES("GetBlobProperties"),
PUT_BLOB("PutBlob"),
PUT_BLOCK("PutBlock"),
PUT_BLOCK_LIST("PutBlockList");
PUT_BLOCK_LIST("PutBlockList"),
BLOB_BATCH("BlobBatch");
private final String key;

View File

@ -317,6 +317,11 @@ class AzureClientProvider extends AbstractLifecycleComponent {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
if (requestIsPartOfABatch(context)) {
// Batch deletes fire once for each of the constituent requests, and they have a null response. Ignore those, we'll track
// metrics at the bulk level.
return next.process();
}
Optional<Object> metricsData = context.getData(RequestMetricsTracker.ES_REQUEST_METRICS_CONTEXT_KEY);
if (metricsData.isPresent() == false) {
assert false : "No metrics object associated with request " + context.getHttpRequest();
@ -361,6 +366,11 @@ class AzureClientProvider extends AbstractLifecycleComponent {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
if (requestIsPartOfABatch(context)) {
// Batch deletes fire once for each of the constituent requests, and they have a null response. Ignore those, we'll track
// metrics at the bulk level.
return next.process();
}
final RequestMetrics requestMetrics = new RequestMetrics();
context.setData(ES_REQUEST_METRICS_CONTEXT_KEY, requestMetrics);
return next.process().doOnSuccess((httpResponse) -> {
@ -389,6 +399,10 @@ class AzureClientProvider extends AbstractLifecycleComponent {
}
}
private static boolean requestIsPartOfABatch(HttpPipelineCallContext context) {
return context.getData("Batch-Operation-Info").isPresent();
}
/**
* The {@link RequestMetricsTracker} calls this when a request completes
*/

View File

@ -87,6 +87,21 @@ public class AzureRepository extends MeteredBlobStoreRepository {
DEFAULT_MAX_SINGLE_UPLOAD_SIZE,
Property.NodeScope
);
/**
* The batch size for batched delete requests
*/
static final Setting<Integer> DELETION_BATCH_SIZE_SETTING = Setting.intSetting(
"delete_objects_max_size",
AzureBlobStore.MAX_ELEMENTS_PER_BATCH,
1,
AzureBlobStore.MAX_ELEMENTS_PER_BATCH
);
/**
* The maximum number of concurrent batch deletes
*/
static final Setting<Integer> MAX_CONCURRENT_BATCH_DELETES_SETTING = Setting.intSetting("max_concurrent_batch_deletes", 10, 1, 100);
}
private final ByteSizeValue chunkSize;

View File

@ -18,6 +18,7 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
@ -47,6 +48,8 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
os.write(blobContent);
os.flush();
});
// BLOB_BATCH
blobStore.deleteBlobsIgnoringIfNotExists(purpose, List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator());
Map<String, Long> stats = blobStore.stats();
String statsMapString = stats.toString();
@ -55,6 +58,7 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.GET_BLOB_PROPERTIES)));
assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.PUT_BLOCK)));
assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.PUT_BLOCK_LIST)));
assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.BLOB_BATCH)));
}
public void testOperationPurposeIsNotReflectedInBlobStoreStatsWhenNotServerless() throws IOException {
@ -79,6 +83,11 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
os.write(blobContent);
os.flush();
});
// BLOB_BATCH
blobStore.deleteBlobsIgnoringIfNotExists(
purpose,
List.of(randomIdentifier(), randomIdentifier(), randomIdentifier()).iterator()
);
}
Map<String, Long> stats = blobStore.stats();
@ -88,6 +97,7 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase {
assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.GET_BLOB_PROPERTIES.getKey()));
assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.PUT_BLOCK.getKey()));
assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.PUT_BLOCK_LIST.getKey()));
assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.BLOB_BATCH.getKey()));
}
private static String statsKey(OperationPurpose purpose, AzureBlobStore.Operation operation) {

View File

@ -12,6 +12,9 @@ import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
@ -23,10 +26,14 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@ -35,6 +42,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.regex.Matcher;
@ -47,6 +55,8 @@ import static org.elasticsearch.repositories.azure.AzureFixtureHelper.assertVali
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate an Azure endpoint")
public class AzureHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(AzureHttpHandler.class);
private final Map<String, BytesReference> blobs;
private final String account;
private final String container;
@ -264,7 +274,98 @@ public class AzureHttpHandler implements HttpHandler {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("POST /" + account + "/" + container + "*restype=container*comp=batch*", request)) {
// Blob Batch (https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch)
final StringBuilder response = new StringBuilder();
try (BufferedReader requestReader = new BufferedReader(new InputStreamReader(exchange.getRequestBody()))) {
final String batchBoundary = requestReader.readLine();
final String responseBoundary = "batch_" + UUID.randomUUID();
String line;
String contentId = null, requestId = null, toDelete = null;
while ((line = requestReader.readLine()) != null) {
if (batchBoundary.equals(line) || (batchBoundary + "--").equals(line)) {
// Found the end of a single request, process it
if (contentId == null || requestId == null || toDelete == null) {
throw new IllegalStateException(
"Missing contentId/requestId/toDelete: " + contentId + "/" + requestId + "/" + toDelete
);
}
// Process the deletion
if (blobs.remove("/" + account + toDelete) != null) {
final String acceptedPart = Strings.format("""
--%s
Content-Type: application/http
Content-ID: %s
HTTP/1.1 202 Accepted
x-ms-delete-type-permanent: true
x-ms-request-id: %s
x-ms-version: 2018-11-09
""", responseBoundary, contentId, requestId).replaceAll("\n", "\r\n");
response.append(acceptedPart);
} else {
final String notFoundBody = Strings.format(
"""
<?xml version="1.0" encoding="utf-8"?>
<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.
RequestId:%s
Time:%s</Message></Error>""",
requestId,
DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now(ZoneId.of("UTC")))
);
final String notFoundPart = Strings.format("""
--%s
Content-Type: application/http
Content-ID: %s
HTTP/1.1 404 The specified blob does not exist.
x-ms-error-code: BlobNotFound
x-ms-request-id: %s
x-ms-version: 2018-11-09
Content-Length: %d
Content-Type: application/xml
%s
""", responseBoundary, contentId, requestId, notFoundBody.length(), notFoundBody)
.replaceAll("\n", "\r\n");
response.append(notFoundPart);
}
// Clear the state
toDelete = null;
contentId = null;
requestId = null;
} else if (Regex.simpleMatch("x-ms-client-request-id: *", line)) {
if (requestId != null) {
throw new IllegalStateException("Got multiple request IDs in a single request?");
}
requestId = line.split("\\s")[1];
} else if (Regex.simpleMatch("Content-ID: *", line)) {
if (contentId != null) {
throw new IllegalStateException("Got multiple content IDs in a single request?");
}
contentId = line.split("\\s")[1];
} else if (Regex.simpleMatch("DELETE /" + container + "/*", line)) {
String blobName = RestUtils.decodeComponent(line.split("(\\s|\\?)")[1]);
if (toDelete != null) {
throw new IllegalStateException("Got multiple deletes in a single request?");
}
toDelete = blobName;
}
}
response.append("--").append(responseBoundary).append("--\r\n0\r\n");
// Send the response
exchange.getResponseHeaders().add("Content-Type", "multipart/mixed; boundary=" + responseBoundary);
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), response.length());
logger.debug("--> Sending response:\n{}", response);
exchange.getResponseBody().write(response.toString().getBytes(StandardCharsets.UTF_8));
}
} else {
logger.warn("--> Unrecognised request received: {}", request);
sendError(exchange, RestStatus.BAD_REQUEST);
}
} finally {