Avoid sorted source for time_series aggs without rates (#127033)
With this change, TS index | STATS ... will be translated to FROM index METADATA _tsid | STATS ... to avoid emitting docs in _tsid and timestamp order, which is expensive. For example, this reduces the execution time of the below query with tsdb track from 50 seconds to 4.5 seconds. TS tsdb | STATS sum(max_over_time(kubernetes.container.memory.usage.bytes)) BY bucket(@timestamp, 5minute)
This commit is contained in:
parent
c2fdc06465
commit
09541c596a
|
@ -132,6 +132,11 @@ public class TimeSeriesIdFieldMapper extends MetadataFieldMapper {
|
||||||
public Query termQuery(Object value, SearchExecutionContext context) {
|
public Query termQuery(Object value, SearchExecutionContext context) {
|
||||||
throw new IllegalArgumentException("[" + NAME + "] is not searchable");
|
throw new IllegalArgumentException("[" + NAME + "] is not searchable");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BlockLoader blockLoader(BlockLoaderContext blContext) {
|
||||||
|
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final boolean useDocValuesSkipper;
|
private final boolean useDocValuesSkipper;
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
|
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
|
||||||
|
|
||||||
|
import org.elasticsearch.index.IndexMode;
|
||||||
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
|
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
|
||||||
import org.elasticsearch.xpack.esql.core.expression.Alias;
|
import org.elasticsearch.xpack.esql.core.expression.Alias;
|
||||||
import org.elasticsearch.xpack.esql.core.expression.Attribute;
|
import org.elasticsearch.xpack.esql.core.expression.Attribute;
|
||||||
|
@ -19,6 +20,7 @@ import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
|
||||||
import org.elasticsearch.xpack.esql.core.util.Holder;
|
import org.elasticsearch.xpack.esql.core.util.Holder;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
|
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
|
||||||
|
@ -120,7 +122,7 @@ import java.util.Map;
|
||||||
*
|
*
|
||||||
* becomes
|
* becomes
|
||||||
*
|
*
|
||||||
* TS k8s
|
* FROM k8s
|
||||||
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
|
* | STATS max_memory_usage = max(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
|
||||||
* | STATS sum(max_memory_usage) BY host_values, time_bucket
|
* | STATS sum(max_memory_usage) BY host_values, time_bucket
|
||||||
*
|
*
|
||||||
|
@ -129,7 +131,7 @@ import java.util.Map;
|
||||||
*
|
*
|
||||||
* becomes
|
* becomes
|
||||||
*
|
*
|
||||||
* TS k8s
|
* FROM k8s
|
||||||
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
|
* | STATS avg_memory_usage = avg(memory_usage), host_values=VALUES(host) BY _tsid, time_bucket=bucket(@timestamp, 1minute)
|
||||||
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
|
* | STATS sum(avg_memory_usage) BY host_values, time_bucket
|
||||||
*
|
*
|
||||||
|
@ -154,11 +156,15 @@ public final class TranslateTimeSeriesAggregate extends OptimizerRules.Optimizer
|
||||||
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
|
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
|
||||||
List<NamedExpression> firstPassAggs = new ArrayList<>();
|
List<NamedExpression> firstPassAggs = new ArrayList<>();
|
||||||
List<NamedExpression> secondPassAggs = new ArrayList<>();
|
List<NamedExpression> secondPassAggs = new ArrayList<>();
|
||||||
|
Holder<Boolean> hasRateAggregates = new Holder<>(Boolean.FALSE);
|
||||||
for (NamedExpression agg : aggregate.aggregates()) {
|
for (NamedExpression agg : aggregate.aggregates()) {
|
||||||
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
|
if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction af) {
|
||||||
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
|
Holder<Boolean> changed = new Holder<>(Boolean.FALSE);
|
||||||
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
|
Expression outerAgg = af.transformDown(TimeSeriesAggregateFunction.class, tsAgg -> {
|
||||||
changed.set(Boolean.TRUE);
|
changed.set(Boolean.TRUE);
|
||||||
|
if (tsAgg instanceof Rate) {
|
||||||
|
hasRateAggregates.set(Boolean.TRUE);
|
||||||
|
}
|
||||||
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
|
AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
|
||||||
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
|
Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
|
||||||
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
|
Alias firstStageAlias = new Alias(tsAgg.source(), agg.name(), firstStageFn);
|
||||||
|
@ -231,16 +237,17 @@ public final class TranslateTimeSeriesAggregate extends OptimizerRules.Optimizer
|
||||||
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
|
secondPassGroupings.add(new Alias(g.source(), g.name(), newFinalGroup.toAttribute(), g.id()));
|
||||||
}
|
}
|
||||||
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
|
LogicalPlan newChild = aggregate.child().transformUp(EsRelation.class, r -> {
|
||||||
|
IndexMode indexMode = hasRateAggregates.get() ? r.indexMode() : IndexMode.STANDARD;
|
||||||
if (r.output().contains(tsid.get()) == false) {
|
if (r.output().contains(tsid.get()) == false) {
|
||||||
return new EsRelation(
|
return new EsRelation(
|
||||||
r.source(),
|
r.source(),
|
||||||
r.indexPattern(),
|
r.indexPattern(),
|
||||||
r.indexMode(),
|
indexMode,
|
||||||
r.indexNameWithModes(),
|
r.indexNameWithModes(),
|
||||||
CollectionUtils.combine(r.output(), tsid.get())
|
CollectionUtils.combine(r.output(), tsid.get())
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
return r;
|
return new EsRelation(r.source(), r.indexPattern(), indexMode, r.indexNameWithModes(), r.output());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
final var firstPhase = new TimeSeriesAggregate(
|
final var firstPhase = new TimeSeriesAggregate(
|
||||||
|
|
|
@ -6847,7 +6847,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
Eval addEval = as(aggsByTsid.child(), Eval.class);
|
Eval addEval = as(aggsByTsid.child(), Eval.class);
|
||||||
assertThat(addEval.fields(), hasSize(1));
|
assertThat(addEval.fields(), hasSize(1));
|
||||||
Add add = as(Alias.unwrap(addEval.fields().get(0)), Add.class);
|
Add add = as(Alias.unwrap(addEval.fields().get(0)), Add.class);
|
||||||
as(addEval.child(), EsRelation.class);
|
EsRelation relation = as(addEval.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
|
|
||||||
assertThat(Expressions.attribute(mul.left()).id(), equalTo(finalAggs.aggregates().get(1).id()));
|
assertThat(Expressions.attribute(mul.left()).id(), equalTo(finalAggs.aggregates().get(1).id()));
|
||||||
assertThat(mul.right().fold(FoldContext.small()), equalTo(1.1));
|
assertThat(mul.right().fold(FoldContext.small()), equalTo(1.1));
|
||||||
|
@ -6877,7 +6878,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
TimeSeriesAggregate aggsByTsid = as(aggsByCluster.child(), TimeSeriesAggregate.class);
|
TimeSeriesAggregate aggsByTsid = as(aggsByCluster.child(), TimeSeriesAggregate.class);
|
||||||
assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped
|
assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped
|
||||||
assertNull(aggsByTsid.timeBucket());
|
assertNull(aggsByTsid.timeBucket());
|
||||||
as(aggsByTsid.child(), EsRelation.class);
|
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
|
|
||||||
Sum sum = as(Alias.unwrap(aggsByCluster.aggregates().get(0)), Sum.class);
|
Sum sum = as(Alias.unwrap(aggsByCluster.aggregates().get(0)), Sum.class);
|
||||||
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
||||||
|
@ -6904,7 +6906,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class);
|
TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class);
|
||||||
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
|
assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped
|
||||||
assertNull(aggsByTsid.timeBucket());
|
assertNull(aggsByTsid.timeBucket());
|
||||||
as(aggsByTsid.child(), EsRelation.class);
|
EsRelation relation = as(aggsByTsid.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
|
|
||||||
Div div = as(Alias.unwrap(eval.fields().get(0)), Div.class);
|
Div div = as(Alias.unwrap(eval.fields().get(0)), Div.class);
|
||||||
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAggs.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAggs.aggregates().get(0).id()));
|
||||||
|
@ -6943,7 +6946,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
||||||
Eval eval = as(aggsByTsid.child(), Eval.class);
|
Eval eval = as(aggsByTsid.child(), Eval.class);
|
||||||
assertThat(eval.fields(), hasSize(1));
|
assertThat(eval.fields(), hasSize(1));
|
||||||
as(eval.child(), EsRelation.class);
|
EsRelation relation = as(eval.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
|
|
||||||
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
||||||
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
||||||
|
@ -6977,7 +6981,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
assertNotNull(aggsByTsid.timeBucket());
|
assertNotNull(aggsByTsid.timeBucket());
|
||||||
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
|
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
|
||||||
Eval bucket = as(aggsByTsid.child(), Eval.class);
|
Eval bucket = as(aggsByTsid.child(), Eval.class);
|
||||||
as(bucket.child(), EsRelation.class);
|
EsRelation relation = as(bucket.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
||||||
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
||||||
|
|
||||||
|
@ -7018,7 +7023,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
assertNotNull(aggsByTsid.timeBucket());
|
assertNotNull(aggsByTsid.timeBucket());
|
||||||
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
|
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5)));
|
||||||
Eval bucket = as(aggsByTsid.child(), Eval.class);
|
Eval bucket = as(aggsByTsid.child(), Eval.class);
|
||||||
as(bucket.child(), EsRelation.class);
|
EsRelation relation = as(bucket.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
||||||
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
||||||
|
|
||||||
|
@ -7082,7 +7088,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
|
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
|
||||||
assertThat(evalBucket.fields(), hasSize(1));
|
assertThat(evalBucket.fields(), hasSize(1));
|
||||||
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
|
Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
|
||||||
as(evalBucket.child(), EsRelation.class);
|
EsRelation relation = as(evalBucket.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.TIME_SERIES));
|
||||||
|
|
||||||
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id()));
|
||||||
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
assertThat(Expressions.attribute(div.right()).id(), equalTo(finalAgg.aggregates().get(1).id()));
|
||||||
|
@ -7120,7 +7127,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
||||||
Eval eval = as(aggsByTsid.child(), Eval.class);
|
Eval eval = as(aggsByTsid.child(), Eval.class);
|
||||||
assertThat(eval.fields(), hasSize(1));
|
assertThat(eval.fields(), hasSize(1));
|
||||||
as(eval.child(), EsRelation.class);
|
EsRelation relation = as(eval.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
|
||||||
|
|
||||||
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
||||||
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
assertThat(Expressions.attribute(sum.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
|
||||||
|
@ -7149,7 +7157,8 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
|
||||||
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1)));
|
||||||
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
|
Eval evalBucket = as(aggsByTsid.child(), Eval.class);
|
||||||
assertThat(evalBucket.fields(), hasSize(1));
|
assertThat(evalBucket.fields(), hasSize(1));
|
||||||
as(evalBucket.child(), EsRelation.class);
|
EsRelation relation = as(evalBucket.child(), EsRelation.class);
|
||||||
|
assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
|
||||||
|
|
||||||
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
Sum sum = as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
|
||||||
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));
|
assertThat(Expressions.attribute(sum.field()).id(), equalTo(evalAvg.fields().get(0).id()));
|
||||||
|
|
Loading…
Reference in New Issue