ESQL: Speed loading stored fields (#127348)

This speeds up loading from stored fields by opting more blocks into the
"sequential" strategy. This really kicks in when loading stored fields
like `text`. And when you need less than 100% of documents, but more than,
say, 10%. This is most useful when you need 99.9% of field documents.
That sort of thing. Here's the perf numbers:
```
%100.0 {"took": 403 -> 401,"documents_found":1000000}
%099.9 {"took":3990 -> 436,"documents_found": 999000}
%099.0 {"took":4069 -> 440,"documents_found": 990000}
%090.0 {"took":3468 -> 421,"documents_found": 900000}
%030.0 {"took":1213 -> 152,"documents_found": 300000}
%020.0 {"took": 766 -> 104,"documents_found": 200000}
%010.0 {"took": 397 ->  55,"documents_found": 100000}
%009.0 {"took": 352 -> 375,"documents_found":  90000}
%008.0 {"took": 304 -> 317,"documents_found":  80000}
%007.0 {"took": 273 -> 287,"documents_found":  70000}
%005.0 {"took": 199 -> 204,"documents_found":  50000}
%001.0 {"took":  46 ->  46,"documents_found":  10000}
```

Let's explain this with an example. First, jump to `main` and load a
million documents:
```
rm -f /tmp/bulk
for a in {1..1000}; do
    echo '{"index":{}}' >> /tmp/bulk
    echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk
done

curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test
for a in {1..1000}; do
    echo -n $a:
    curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors
done
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh
echo
```

Now query them all. Run this a few times until it's stable:
```
echo -n "%100.0 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    }
}' | jq -c '{took, documents_found}'
```

Now fetch 99.9% of documents:
```
echo -n "%099.9 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    }
}' | jq -c '{took, documents_found}'
```

This should spit out something like:
```
%100.0 { "took":403,"documents_found":1000000}
%099.9 {"took":4098, "documents_found":999000}
```

We're loading *fewer* documents but it's slower! What in the world?!
If you dig into the profile you'll see that it's value loading:
```
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    },
    "profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
  "operator": "ValuesSourceReaderOperator[fields = [text]]",
  "status": {
    "readers_built": {
      "stored_fields[requires_source:true, fields:0, sequential: true]": 222,
      "text:column_at_a_time:null": 222,
      "text:row_stride:BlockSourceReader.Bytes": 1
    },
    "values_loaded": 1000000,
    "process_nanos": 370687157,
    "pages_processed": 222,
    "rows_received": 1000000,
    "rows_emitted": 1000000
  }
}
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
    "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
    "pragma": {
        "data_partitioning": "shard"
    },
    "profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
  "operator": "ValuesSourceReaderOperator[fields = [text]]",
  "status": {
    "readers_built": {
      "stored_fields[requires_source:true, fields:0, sequential: false]": 222,
      "text:column_at_a_time:null": 222,
      "text:row_stride:BlockSourceReader.Bytes": 1
    },
    "values_loaded": 999000,
    "process_nanos": 3965803793,
    "pages_processed": 222,
    "rows_received": 999000,
    "rows_emitted": 999000
  }
}
```

It jumps from 370ms to almost four seconds! Loading fewer values! The
second big difference is in the `stored_fields` marker. In the second on
it's `sequential: false` and in the first `sequential: true`.

`sequential: true` uses Lucene's "merge" stored fields reader instead of
the default one. It's much more optimized at decoding sequences of
documents.

Previously we only enabled this reader when loading compact sequences of
documents - when the entire block looks like
```
1, 2, 3, 4, 5, ... 1230, 1231
```

If there are any gaps we wouldn't enable it. That was a very
conservative thing we did long ago without doing any experiments. We
knew it was faster without any gaps, but not otherwise. It turns out
it's a lot faster in a lot more cases. I've measured it as faster for
99% gaps, at least on simple documents. I'm a bit worried that this is
too aggressive, so I've set made it configurable and made the default
being to use the "merge" loader with 10% gaps. So we'd use the merge
loader with a block like:
```
1, 11, 21, 31, ..., 1231, 1241
```
This commit is contained in:
Nik Everett 2025-04-29 17:20:15 -04:00 committed by GitHub
parent 34ebf8bb09
commit 10336c950c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 390 additions and 33 deletions

View File

@ -25,6 +25,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
@ -50,6 +51,7 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -335,7 +337,7 @@ public class ValuesSourceReaderBenchmark {
fields(name),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
throw new UnsupportedOperationException("can't load _source here");
})),
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
0
);
long sum = 0;

View File

@ -0,0 +1,5 @@
pr: 127348
summary: Speed loading stored fields
area: ES|QL
type: enhancement
issues: []

View File

@ -49,7 +49,7 @@ $$$index-codec$$$ `index.codec`
$$$index-mode-setting$$$ `index.mode`
: The `index.mode` setting is used to control settings applied in specific domains like ingestion of time series data or logs. Different mutually exclusive modes exist, which are used to apply settings or default values controlling indexing of documents, sorting and other parameters whose value affects indexing or query performance.
**Example**
```console
@ -248,3 +248,8 @@ $$$index-final-pipeline$$$
$$$index-hidden$$$ `index.hidden`
: Indicates whether the index should be hidden by default. Hidden indices are not returned by default when using a wildcard expression. This behavior is controlled per request through the use of the `expand_wildcards` parameter. Possible values are `true` and `false` (default).
$$$index-esql-stored-fields-sequential-proportion$$$
`index.esql.stored_fields_sequential_proportion`
: Tuning parameter for deciding when {{esql}} will load [Stored fields](/reference/elasticsearch/rest-apis/retrieve-selected-fields.md#stored-fields) using a strategy tuned for loading dense sequence of documents. Allows values between 0.0 and 1.0 and defaults to 0.2. Indices with documents smaller than 10kb may see speed improvements loading `text` fields by setting this lower.

View File

@ -107,7 +107,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
*/
public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader) {}
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
private final FieldWork[] fields;
private final List<ShardContext> shardContexts;
@ -247,8 +247,9 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
}
SourceLoader sourceLoader = null;
ShardContext shardContext = shardContexts.get(shard);
if (storedFieldsSpec.requiresSource()) {
sourceLoader = shardContexts.get(shard).newSourceLoader.get();
sourceLoader = shardContext.newSourceLoader.get();
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
}
@ -261,7 +262,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
);
}
StoredFieldLoader storedFieldLoader;
if (useSequentialStoredFieldsReader(docs)) {
if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
trackStoredFields(storedFieldsSpec, true);
} else {
@ -438,9 +439,13 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
* Is it more efficient to use a sequential stored field reader
* when reading stored fields for the documents contained in {@code docIds}?
*/
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
int count = docs.count();
return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1;
if (count < SEQUENTIAL_BOUNDARY) {
return false;
}
int range = docs.get(count - 1) - docs.get(0);
return range * storedFieldsSequentialProportion <= count;
}
private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

View File

@ -198,7 +198,7 @@ public class OperatorTests extends MapperServiceTestCase {
operators.add(
new OrdinalsGroupingOperator(
shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
ElementType.BYTES_REF,
0,
gField,

View File

@ -208,7 +208,7 @@ public abstract class LuceneQueryEvaluatorTests<T extends Vector, U extends Vect
),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
throw new UnsupportedOperationException();
})),
}, 0.2)),
0
)
);

View File

@ -200,7 +200,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
private List<ValuesSourceReaderOperator.ShardContext> initShardContexts() {
return INDICES.keySet()
.stream()
.map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE))
.map(index -> new ValuesSourceReaderOperator.ShardContext(reader(index), () -> SourceLoader.FROM_STORED_SOURCE, 0.2))
.toList();
}
@ -1297,7 +1297,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
LuceneOperator.NO_LIMIT,
false // no scoring
);
var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE);
var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2);
try (
Driver driver = TestDriverFactory.create(
driverContext,
@ -1415,7 +1415,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
cases.stream().map(c -> c.info).toList(),
List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
0
);
assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]"));
@ -1443,7 +1443,9 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
List<ValuesSourceReaderOperator.ShardContext> readerShardContexts = new ArrayList<>();
for (int s = 0; s < shardCount; s++) {
contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s));
readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE));
readerShardContexts.add(
new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE, 0.2)
);
}
var luceneFactory = new LuceneSourceOperator.Factory(
contexts,

View File

@ -114,6 +114,8 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
{ false, true, true },
{ false, false, true, true } };
static final double STORED_FIELDS_SEQUENTIAL_PROPORTIONS = 0.2;
private Directory directory = newDirectory();
private MapperService mapperService;
private IndexReader reader;
@ -147,7 +149,16 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
fail("unexpected shardIdx [" + shardIdx + "]");
}
return loader;
})), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)), 0);
})),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
);
}
@Override
@ -443,7 +454,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
operators.add(
new ValuesSourceReaderOperator.Factory(
List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext)
);
@ -549,7 +566,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
operators.add(
new ValuesSourceReaderOperator.Factory(
List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext)
);
@ -561,7 +584,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
operators.add(
new ValuesSourceReaderOperator.Factory(
b.stream().map(i -> i.info).toList(),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext)
);
@ -651,7 +680,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
.map(
i -> new ValuesSourceReaderOperator.Factory(
List.of(i.info),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext)
)
@ -1417,7 +1452,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS),
new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS)
),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext)
),
@ -1462,7 +1503,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
fieldInfo(mapperService.fieldType("key"), ElementType.INT),
fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF)
),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
).get(driverContext);
List<Page> results = drive(op, source.iterator(), driverContext);
@ -1490,7 +1537,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
cases.stream().map(c -> c.info).toList(),
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
reader,
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
),
0
);
assertThat(factory.describe(), equalTo("ValuesSourceReaderOperator[fields = [" + cases.size() + " fields]]"));
@ -1517,7 +1570,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
List<ValuesSourceReaderOperator.ShardContext> readerShardContexts = new ArrayList<>();
for (int s = 0; s < shardCount; s++) {
contexts.add(new LuceneSourceOperatorTests.MockShardContext(readers[s], s));
readerShardContexts.add(new ValuesSourceReaderOperator.ShardContext(readers[s], () -> SourceLoader.FROM_STORED_SOURCE));
readerShardContexts.add(
new ValuesSourceReaderOperator.ShardContext(
readers[s],
() -> SourceLoader.FROM_STORED_SOURCE,
STORED_FIELDS_SEQUENTIAL_PROPORTIONS
)
);
}
var luceneFactory = new LuceneSourceOperator.Factory(
contexts,

View File

@ -0,0 +1,219 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.qa.single_node;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.esql.AssertWarnings;
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
import org.junit.Before;
import org.junit.ClassRule;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.entityToMap;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.requestObjectBuilder;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.runEsql;
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.commonProfile;
import static org.elasticsearch.xpack.esql.qa.single_node.RestEsqlIT.fixTypesOnProfile;
import static org.hamcrest.Matchers.*;
/**
* Tests for {@code index.esql.stored_fields_sequential_proportion} which controls
* an optimization we use when loading from {@code _source}.
*/
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class StoredFieldsSequentialIT extends ESRestTestCase {
private static final Logger LOG = LogManager.getLogger(StoredFieldsSequentialIT.class);
@ClassRule
public static ElasticsearchCluster cluster = Clusters.testCluster();
public void testFetchTen() throws IOException {
testQuery(null, """
FROM test
| LIMIT 10
""", 10, true);
}
public void testAggAll() throws IOException {
testQuery(null, """
FROM test
| STATS SUM(LENGTH(test))
""", 1000, true);
}
public void testAggTwentyPercent() throws IOException {
testQuery(null, """
FROM test
| WHERE STARTS_WITH(test.keyword, "test1") OR STARTS_WITH(test.keyword, "test2")
| STATS SUM(LENGTH(test))
""", 200, true);
}
public void testAggTenPercentDefault() throws IOException {
testAggTenPercent(null, false);
}
public void testAggTenPercentConfiguredToTenPct() throws IOException {
testAggTenPercent(0.10, true);
}
public void testAggTenPercentConfiguredToOnePct() throws IOException {
testAggTenPercent(0.01, true);
}
/**
* It's important for the test that the queries we use detect "scattered" docs.
* If they were "compact" in the index we'd still load them using the sequential
* reader.
*/
private void testAggTenPercent(Double percent, boolean sequential) throws IOException {
String filter = IntStream.range(0, 10)
.mapToObj(i -> String.format(Locale.ROOT, "STARTS_WITH(test.keyword, \"test%s%s\")", i, i))
.collect(Collectors.joining(" OR "));
testQuery(percent, String.format(Locale.ROOT, """
FROM test
| WHERE %s
| STATS SUM(LENGTH(test))
""", filter), 100, sequential);
}
private void testQuery(Double percent, String query, int documentsFound, boolean sequential) throws IOException {
setPercent(percent);
RestEsqlTestCase.RequestObjectBuilder builder = requestObjectBuilder().query(query);
builder.profile(true);
Map<String, Object> result = runEsql(builder, new AssertWarnings.NoWarnings(), RestEsqlTestCase.Mode.SYNC);
assertMap(
result,
matchesMap().entry("documents_found", documentsFound)
.entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
)
.extraOk()
);
@SuppressWarnings("unchecked")
List<Map<String, Object>> profiles = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("drivers");
for (Map<String, Object> p : profiles) {
fixTypesOnProfile(p);
assertThat(p, commonProfile());
@SuppressWarnings("unchecked")
List<Map<String, Object>> operators = (List<Map<String, Object>>) p.get("operators");
for (Map<String, Object> o : operators) {
LOG.info("profile {}", o.get("operator"));
}
for (Map<String, Object> o : operators) {
checkOperatorProfile(o, sequential);
}
}
}
private void setPercent(Double percent) throws IOException {
Request set = new Request("PUT", "test/_settings");
set.setJsonEntity(String.format(Locale.ROOT, """
{
"index": {
"esql": {
"stored_fields_sequential_proportion": %s
}
}
}
""", percent));
assertMap(entityToMap(client().performRequest(set).getEntity(), XContentType.JSON), matchesMap().entry("acknowledged", true));
}
@Before
public void buildIndex() throws IOException {
Request exists = new Request("GET", "test");
try {
client().performRequest(exists);
return;
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() != 404) {
throw e;
}
}
Request createIndex = new Request("PUT", "test");
createIndex.setJsonEntity("""
{
"settings": {
"index": {
"number_of_shards": 1,
"sort.field": "i"
}
},
"mappings": {
"properties": {
"i": {"type": "long"}
}
}
}""");
Response createResponse = client().performRequest(createIndex);
assertThat(
entityToMap(createResponse.getEntity(), XContentType.JSON),
matchesMap().entry("shards_acknowledged", true).entry("index", "test").entry("acknowledged", true)
);
Request bulk = new Request("POST", "/_bulk");
bulk.addParameter("refresh", "");
StringBuilder b = new StringBuilder();
for (int i = 0; i < 1000; i++) {
b.append(String.format("""
{"create":{"_index":"test"}}
{"test":"test%03d", "i": %d}
""", i, i));
}
bulk.setJsonEntity(b.toString());
Response bulkResponse = client().performRequest(bulk);
assertThat(entityToMap(bulkResponse.getEntity(), XContentType.JSON), matchesMap().entry("errors", false).extraOk());
}
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
private static void checkOperatorProfile(Map<String, Object> o, boolean sequential) {
String name = (String) o.get("operator");
if (name.startsWith("ValuesSourceReaderOperator")) {
MapMatcher readersBuilt = matchesMap().entry(
"stored_fields[requires_source:true, fields:0, sequential: " + sequential + "]",
greaterThanOrEqualTo(1)
).extraOk();
MapMatcher expectedOp = matchesMap().entry("operator", name)
.entry("status", matchesMap().entry("readers_built", readersBuilt).extraOk());
assertMap(o, expectedOp);
}
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
}

View File

@ -204,7 +204,7 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
),
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.getSearchExecutionContext().getIndexReader(), () -> {
throw new IllegalStateException("can't load source here");
})),
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
0
);
CancellableTask parentTask = new EsqlQueryTask(

View File

@ -21,6 +21,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.data.Block;
@ -408,7 +409,13 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
return new ValuesSourceReaderOperator(
driverContext.blockFactory(),
fields,
List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
List.of(
new ValuesSourceReaderOperator.ShardContext(
shardContext.searcher().getIndexReader(),
shardContext::newSourceLoader,
EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY)
)
),
0
);
}

View File

@ -65,6 +65,7 @@ import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.DriverParallelism;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import java.io.IOException;
import java.util.ArrayList;
@ -91,6 +92,16 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
* Convert a {@link QueryBuilder} into a real {@link Query lucene query}.
*/
Query toQuery(QueryBuilder queryBuilder);
/**
* Tuning parameter for deciding when to use the "merge" stored field loader.
* Think of it as "how similar to a sequential block of documents do I have to
* be before I'll use the merge reader?" So a value of {@code 1} means I have to
* be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
* A value of {@code .2} means we'll use the sequential reader even if we only
* need one in ten documents.
*/
double storedFieldsSequentialProportion();
}
private final List<ShardContext> shardContexts;
@ -112,7 +123,13 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
Layout.Builder layout = source.layout.builder();
var sourceAttr = fieldExtractExec.sourceAttribute();
List<ValuesSourceReaderOperator.ShardContext> readers = shardContexts.stream()
.map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader))
.map(
s -> new ValuesSourceReaderOperator.ShardContext(
s.searcher().getIndexReader(),
s::newSourceLoader,
s.storedFieldsSequentialProportion()
)
)
.toList();
int docChannel = source.layout.get(sourceAttr.id()).channel();
for (Attribute attr : fieldExtractExec.attributesToExtract()) {
@ -284,7 +301,13 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child());
int docChannel = source.layout.get(sourceAttribute.id()).channel();
List<ValuesSourceReaderOperator.ShardContext> vsShardContexts = shardContexts.stream()
.map(s -> new ValuesSourceReaderOperator.ShardContext(s.searcher().getIndexReader(), s::newSourceLoader))
.map(
s -> new ValuesSourceReaderOperator.ShardContext(
s.searcher().getIndexReader(),
s::newSourceLoader,
s.storedFieldsSequentialProportion()
)
)
.toList();
// The grouping-by values are ready, let's group on them directly.
// Costin: why are they ready and not already exposed in the layout?
@ -417,6 +440,11 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
protected @Nullable MappedFieldType fieldType(String name) {
return ctx.getFieldType(name);
}
@Override
public double storedFieldsSequentialProportion() {
return EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.get(ctx.getIndexSettings().getSettings());
}
}
private static class TypeConvertingBlockLoader implements BlockLoader {

View File

@ -536,6 +536,12 @@ public class ComputeService {
new EsPhysicalOperationProviders.DefaultShardContext(i, searchExecutionContext, searchContext.request().getAliasFilter())
);
}
EsPhysicalOperationProviders physicalOperationProviders = new EsPhysicalOperationProviders(
context.foldCtx(),
contexts,
searchService.getIndicesService().getAnalysis(),
defaultDataPartitioning
);
final List<Driver> drivers;
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
@ -551,12 +557,7 @@ public class ComputeService {
enrichLookupService,
lookupFromIndexService,
inferenceRunner,
new EsPhysicalOperationProviders(
context.foldCtx(),
contexts,
searchService.getIndicesService().getAnalysis(),
defaultDataPartitioning
),
physicalOperationProviders,
contexts
);

View File

@ -163,6 +163,29 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
Setting.Property.Dynamic
);
/**
* Tuning parameter for deciding when to use the "merge" stored field loader.
* Think of it as "how similar to a sequential block of documents do I have to
* be before I'll use the merge reader?" So a value of {@code 1} means I have to
* be <strong>exactly</strong> a sequential block, like {@code 0, 1, 2, 3, .. 1299, 1300}.
* A value of {@code .2} means we'll use the sequential reader even if we only
* need one in ten documents.
* <p>
* The default value of this was experimentally derived using a
* <a href="https://gist.github.com/nik9000/ac6857de10745aad210b6397915ff846">script</a>.
* And a little paranoia. A lower default value was looking good locally, but
* I'm concerned about the implications of effectively using this all the time.
* </p>
*/
public static final Setting<Double> STORED_FIELDS_SEQUENTIAL_PROPORTION = Setting.doubleSetting(
"index.esql.stored_fields_sequential_proportion",
0.20,
0,
1,
Setting.Property.IndexScope,
Setting.Property.Dynamic
);
@Override
public Collection<?> createComponents(PluginServices services) {
CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
@ -226,7 +249,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
ESQL_QUERYLOG_THRESHOLD_INFO_SETTING,
ESQL_QUERYLOG_THRESHOLD_WARN_SETTING,
ESQL_QUERYLOG_INCLUDE_USER_SETTING,
DEFAULT_DATA_PARTITIONING
DEFAULT_DATA_PARTITIONING,
STORED_FIELDS_SEQUENTIAL_PROPORTION
);
}