Ensure vectors are always included in reindex actions (#130834)

This change modifies reindex behavior to always include vector fields, even if the target index omits embeddings from _source.
This prepares for scenarios where embeddings may be automatically excluded (#130382).
This commit is contained in:
Jim Ferenczi 2025-07-09 07:45:13 +02:00 committed by GitHub
parent 950e1294d7
commit 29e01e1964
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 244 additions and 24 deletions

View File

@ -0,0 +1,5 @@
pr: 130834
summary: Ensure vectors are always included in reindex actions
area: Vector Search
type: enhancement
issues: []

View File

@ -44,6 +44,7 @@ import org.elasticsearch.script.Metadata;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.threadpool.ThreadPool;
@ -119,6 +120,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
BulkByScrollTask task,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors,
Logger logger,
ParentTaskAssigningClient client,
ThreadPool threadPool,
@ -131,6 +133,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
task,
needsSourceDocumentVersions,
needsSourceDocumentSeqNoAndPrimaryTerm,
needsVectors,
logger,
client,
client,
@ -146,6 +149,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
BulkByScrollTask task,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors,
Logger logger,
ParentTaskAssigningClient searchClient,
ParentTaskAssigningClient bulkClient,
@ -173,7 +177,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(
backoffPolicy,
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm)
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm, needsVectors)
);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
}
@ -186,7 +190,8 @@ public abstract class AbstractAsyncBulkByScrollAction<
static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prepareSearchRequest(
Request mainRequest,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors
) {
var preparedSearchRequest = new SearchRequest(mainRequest.getSearchRequest());
@ -205,6 +210,16 @@ public abstract class AbstractAsyncBulkByScrollAction<
sourceBuilder.version(needsSourceDocumentVersions);
sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);
if (needsVectors) {
// always include vectors in the response unless explicitly set
var fetchSource = sourceBuilder.fetchSource();
if (fetchSource == null) {
sourceBuilder.fetchSource(FetchSourceContext.FETCH_ALL_SOURCE);
} else if (fetchSource.excludeVectors() == null) {
sourceBuilder.excludeVectors(false);
}
}
/*
* Do not open scroll if max docs <= scroll size and not resuming on version conflicts
*/

View File

@ -34,7 +34,7 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<De
ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener
) {
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
super(task, false, true, false, logger, client, threadPool, request, listener, scriptService, null);
}
@Override

View File

@ -237,6 +237,7 @@ public class Reindexer {
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false,
true,
logger,
searchClient,
bulkClient,

View File

@ -124,6 +124,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
// use sequence number powered optimistic concurrency control unless requested
request.getSearchRequest().source() != null && Boolean.TRUE.equals(request.getSearchRequest().source().version()),
true,
true,
logger,
client,
threadPool,

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@ -140,13 +141,33 @@ final class RemoteRequestBuilders {
}
}
if (searchRequest.source().fetchSource() != null) {
entity.field("_source", searchRequest.source().fetchSource());
} else {
var fetchSource = searchRequest.source().fetchSource();
if (fetchSource == null) {
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
entity.field("_source", true);
}
} else {
if (remoteVersion.onOrAfter(Version.V_9_1_0) || fetchSource.excludeVectors() == null) {
entity.field("_source", fetchSource);
} else {
// Versions before 9.1.0 don't support "exclude_vectors" so we need to manually convert.
if (fetchSource.includes().length == 0 && fetchSource.excludes().length == 0) {
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
entity.field("_source", true);
}
} else {
entity.startObject("_source");
if (fetchSource.includes().length > 0) {
entity.field(FetchSourceContext.INCLUDES_FIELD.getPreferredName(), fetchSource.includes());
}
if (fetchSource.excludes().length > 0) {
entity.field(FetchSourceContext.EXCLUDES_FIELD.getPreferredName(), fetchSource.excludes());
}
entity.endObject();
}
}
}
entity.endObject();

View File

@ -876,7 +876,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
public void testEnableScrollByDefault() {
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
assertThat(preparedSearchRequest.scroll(), notNullValue());
}
@ -884,7 +884,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
testRequest.setMaxDocs(between(101, 1000));
testRequest.getSearchRequest().source().size(100);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
assertThat(preparedSearchRequest.scroll(), notNullValue());
}
@ -893,7 +893,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
testRequest.setMaxDocs(between(1, 100));
testRequest.getSearchRequest().source().size(100);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
assertThat(preparedSearchRequest.scroll(), nullValue());
}
@ -903,7 +903,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
testRequest.getSearchRequest().source().size(100);
testRequest.setAbortOnVersionConflict(false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
assertThat(preparedSearchRequest.scroll(), notNullValue());
}
@ -943,6 +943,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
testTask,
randomBoolean(),
randomBoolean(),
randomBoolean(),
AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask),
client.threadPool(),

View File

@ -10,6 +10,8 @@
package org.elasticsearch.reindex;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequestBuilder;
@ -22,7 +24,10 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -175,4 +180,60 @@ public class ReindexBasicTests extends ReindexTestCase {
assertHitCount(prepareSearch(destIndexName).setSize(0), 4);
}
public void testReindexIncludeVectors() throws Exception {
var resp1 = prepareCreate("test").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp1);
var resp2 = prepareCreate("test_reindex").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp2);
indexRandom(
true,
prepareIndex("test").setId("1").setSource("foo", List.of(3f, 2f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
);
var searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}
// Copy all the docs
ReindexRequestBuilder copy = reindex().source("test").destination("test_reindex").refresh(true);
var reindexResponse = copy.get();
assertThat(reindexResponse, matcher().created(1));
searchResponse = prepareSearch("test_reindex").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}
searchResponse = prepareSearch("test_reindex").setExcludeVectors(false).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
assertThat(
sourceMap.get("bar"),
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
);
} finally {
searchResponse.decRef();
}
}
}

View File

@ -10,6 +10,8 @@
package org.elasticsearch.reindex;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
@ -23,7 +25,10 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
public class UpdateByQueryBasicTests extends ReindexTestCase {
@ -150,4 +155,54 @@ public class UpdateByQueryBasicTests extends ReindexTestCase {
.get();
assertThat(response, matcher().updated(0).slices(hasSize(0)));
}
public void testUpdateByQueryIncludeVectors() throws Exception {
var resp1 = prepareCreate("test").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp1);
indexRandom(
true,
prepareIndex("test").setId("1").setSource("foo", List.of(3.0f, 2.0f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
);
var searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}
// Copy all the docs
var updateByQueryResponse = updateByQuery().source("test").refresh(true).get();
assertThat(updateByQueryResponse, matcher().updated(1L));
searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}
searchResponse = prepareSearch("test").setExcludeVectors(false).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
assertThat(
sourceMap.get("bar"),
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
);
} finally {
searchResponse.decRef();
}
}
}

View File

@ -211,14 +211,29 @@ public class RemoteRequestBuildersTests extends ESTestCase {
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
// always set by AbstractAsyncBulkByScrollAction#prepareSearchRequest
searchRequest.source().excludeVectors(false);
String query = "{\"match_all\":{}}";
HttpEntity entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
assertEquals(
"{\"query\":" + query + ",\"_source\":true}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))
);
if (remoteVersion.onOrAfter(Version.V_9_1_0)) {
// vectors are automatically included on recent versions
assertEquals(XContentHelper.stripWhitespace(Strings.format("""
{
"query": %s,
"_source": {
"exclude_vectors":false,
"includes": [],
"excludes": []
}
}""", query)), Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
} else {
assertEquals(
"{\"query\":" + query + ",\"_source\":true}",
Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))
);
}
} else {
assertEquals(
"{\"query\":" + query + "}",
@ -230,14 +245,27 @@ public class RemoteRequestBuildersTests extends ESTestCase {
searchRequest.source().fetchSource(new String[] { "in1", "in2" }, new String[] { "out" });
entity = initialSearch(searchRequest, new BytesArray(query), remoteVersion).getEntity();
assertEquals(ContentType.APPLICATION_JSON.toString(), entity.getContentType().getValue());
assertEquals(XContentHelper.stripWhitespace(Strings.format("""
{
"query": %s,
"_source": {
"includes": [ "in1", "in2" ],
"excludes": [ "out" ]
}
}""", query)), Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
if (remoteVersion.onOrAfter(Version.V_9_1_0)) {
// vectors are automatically included on recent versions
assertEquals(XContentHelper.stripWhitespace(Strings.format("""
{
"query": %s,
"_source": {
"exclude_vectors":false,
"includes": [ "in1", "in2" ],
"excludes": [ "out" ]
}
}""", query)), Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
} else {
assertEquals(XContentHelper.stripWhitespace(Strings.format("""
{
"query": %s,
"_source": {
"includes": [ "in1", "in2" ],
"excludes": [ "out" ]
}
}""", query)), Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8)));
}
// Invalid XContent fails
RuntimeException e = expectThrows(

View File

@ -266,6 +266,14 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
return this;
}
/**
* Indicate whether vectors should be excluded from the _source.
*/
public SearchRequestBuilder setExcludeVectors(boolean excludeVectors) {
sourceBuilder().excludeVectors(excludeVectors);
return this;
}
/**
* Adds a docvalue based field to load and return. The field does not have to be stored,
* but its recommended to use non analyzed or numeric fields.

View File

@ -878,7 +878,12 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
*/
public SearchSourceBuilder fetchSource(boolean fetch) {
FetchSourceContext fetchSourceContext = this.fetchSourceContext != null ? this.fetchSourceContext : FetchSourceContext.FETCH_SOURCE;
this.fetchSourceContext = FetchSourceContext.of(fetch, fetchSourceContext.includes(), fetchSourceContext.excludes());
this.fetchSourceContext = FetchSourceContext.of(
fetch,
fetchSourceContext.excludeVectors(),
fetchSourceContext.includes(),
fetchSourceContext.excludes()
);
return this;
}
@ -915,7 +920,12 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
*/
public SearchSourceBuilder fetchSource(@Nullable String[] includes, @Nullable String[] excludes) {
FetchSourceContext fetchSourceContext = this.fetchSourceContext != null ? this.fetchSourceContext : FetchSourceContext.FETCH_SOURCE;
this.fetchSourceContext = FetchSourceContext.of(fetchSourceContext.fetchSource(), includes, excludes);
this.fetchSourceContext = FetchSourceContext.of(
fetchSourceContext.fetchSource(),
fetchSourceContext.excludeVectors(),
includes,
excludes
);
return this;
}
@ -927,6 +937,20 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
return this;
}
/**
* Indicate whether vectors should be excluded from the _source.
*/
public SearchSourceBuilder excludeVectors(boolean excludeVectors) {
FetchSourceContext fetchSourceContext = this.fetchSourceContext != null ? this.fetchSourceContext : FetchSourceContext.FETCH_SOURCE;
this.fetchSourceContext = FetchSourceContext.of(
fetchSourceContext.fetchSource(),
excludeVectors,
fetchSourceContext.includes(),
fetchSourceContext.excludes()
);
return this;
}
/**
* Gets the {@link FetchSourceContext} which defines how the _source should
* be fetched.