ESQL: Split grouping functions based on their EVAL-ability (#126597)

This splits the grouping functions in two: those that can be evaluated independently through the EVAL operator (`BUCKET`) and those that don't (like those that that are evaluated through an agg operator, `CATEGORIZE`).

Closes #124608
This commit is contained in:
Bogdan Pintea 2025-04-11 16:19:54 +02:00 committed by GitHub
parent f658af6628
commit 9784e0ec5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 168 additions and 104 deletions

View File

@ -7,6 +7,6 @@ Groups text messages into categories of similarly formatted text values.
`CATEGORIZE` has the following limitations:
* cant be used within other expressions
* cant be used with multiple groupings
* cant be used or referenced within aggregate functions
* cant be used more than once in the groupings
* cant be used or referenced within aggregate functions and it has to be the first grouping

View File

@ -584,6 +584,21 @@ c:long
3
;
reuse categorize arg expression in agg
required_capability: categorize_v5
FROM sample_data
| STATS m = MAX(LENGTH(CONCAT(message, "_end"))) BY c = CATEGORIZE(CONCAT(message, "_end"))
| SORT m
;
m:integer |c:keyword
16 |.*?Disconnected_end.*?
20 |.*?Connection.+?error_end.*?
25 |.*?Connected.+?to.*?
;
categorize in aggs inside function
required_capability: categorize_v5

View File

@ -58,7 +58,10 @@ import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeTo
* from a number of desired buckets (as a hint) and a range (auto mode).
* In the former case, two parameters will be provided, in the latter four.
*/
public class Bucket extends GroupingFunction implements PostOptimizationVerificationAware, TwoOptionalArguments {
public class Bucket extends GroupingFunction.EvaluatableGroupingFunction
implements
PostOptimizationVerificationAware,
TwoOptionalArguments {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new);
// TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up.

View File

@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.expression.function.grouping;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
@ -37,7 +36,7 @@ import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isStr
* For the implementation, see {@link org.elasticsearch.compute.aggregation.blockhash.CategorizeBlockHash}
* </p>
*/
public class Categorize extends GroupingFunction {
public class Categorize extends GroupingFunction.NonEvaluatableGroupingFunction {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"Categorize",
@ -53,8 +52,8 @@ public class Categorize extends GroupingFunction {
`CATEGORIZE` has the following limitations:
* cant be used within other expressions
* cant be used with multiple groupings
* cant be used or referenced within aggregate functions""",
* cant be used more than once in the groupings
* cant be used or referenced within aggregate functions and it has to be the first grouping""",
examples = {
@Example(
file = "docs",
@ -101,11 +100,6 @@ public class Categorize extends GroupingFunction {
return Nullability.TRUE;
}
@Override
public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
throw new UnsupportedOperationException("CATEGORIZE is only evaluated during aggregations");
}
@Override
protected TypeResolution resolveType() {
return isString(field(), sourceText(), DEFAULT);

View File

@ -22,17 +22,13 @@ import java.util.function.BiConsumer;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
public abstract class GroupingFunction extends Function implements EvaluatorMapper, PostAnalysisPlanVerificationAware {
public abstract sealed class GroupingFunction extends Function implements PostAnalysisPlanVerificationAware permits
GroupingFunction.NonEvaluatableGroupingFunction, GroupingFunction.EvaluatableGroupingFunction {
protected GroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}
@Override
public Object fold(FoldContext ctx) {
return EvaluatorMapper.super.fold(source(), ctx);
}
@Override
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
return (p, failures) -> {
@ -45,4 +41,29 @@ public abstract class GroupingFunction extends Function implements EvaluatorMapp
};
}
/**
* This is a class of grouping functions that cannot be evaluated outside the context of an aggregation.
* They will have their evaluation implemented part of an aggregation, which may keep state for their execution, making them "stateful"
* grouping functions.
*/
public abstract static non-sealed class NonEvaluatableGroupingFunction extends GroupingFunction {
protected NonEvaluatableGroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}
}
/**
* This is a class of grouping functions that can be evaluated independently within an EVAL operator, independent of the aggregation
* they're used by.
*/
public abstract static non-sealed class EvaluatableGroupingFunction extends GroupingFunction implements EvaluatorMapper {
protected EvaluatableGroupingFunction(Source source, List<Expression> fields) {
super(source, fields);
}
@Override
public Object fold(FoldContext ctx) {
return EvaluatorMapper.super.fold(source(), ctx);
}
}
}

View File

@ -15,7 +15,7 @@ import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
@ -67,11 +67,11 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule<Unary
for (Expression grouping : groupings) {
if (grouping instanceof Attribute attribute) {
groupingAttrs.add(attribute);
} else if (grouping instanceof Alias as && as.child() instanceof Categorize) {
} else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) {
groupingAttrs.add(as);
} else {
// After applying ReplaceAggregateNestedExpressionWithEval,
// groupings (except Categorize) can only contain attributes.
// evaluatable groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
}
}
@ -145,7 +145,8 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule<Unary
List<? extends NamedExpression> lowerProjections
) {
assert upperGroupings.size() <= 1
|| upperGroupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false
|| upperGroupings.stream()
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
@ -159,10 +160,10 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule<Unary
// Propagate any renames from the lower projection into the upper groupings.
// This can lead to duplicates: e.g.
// | EVAL x = y | STATS ... BY x, y
// All substitutions happen before; groupings must be attributes at this point except for CATEGORIZE which will be an alias like
// `c = CATEGORIZE(attribute)`.
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be
// an alias like `c = CATEGORIZE(attribute)`.
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
// TODO: The deduplication based on simple equality will be insufficient in case of multiple CATEGORIZEs, e.g. for
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
for (NamedExpression ne : upperGroupings) {

View File

@ -13,7 +13,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
@ -43,9 +43,9 @@ public class FoldNull extends OptimizerRules.OptimizerExpressionRule<Expression>
return Literal.of(in, null);
}
} else if (e instanceof Alias == false && e.nullable() == Nullability.TRUE
// Categorize function stays as a STATS grouping (It isn't moved to an early EVAL like other groupings),
// Non-evaluatable functions stay as a STATS grouping (It isn't moved to an early EVAL like other groupings),
// so folding it to null would currently break the plan, as we don't create an attribute/channel for that null value.
&& e instanceof Categorize == false
&& e instanceof GroupingFunction.NonEvaluatableGroupingFunction == false
&& Expressions.anyMatch(e.children(), Expressions::isGuaranteedNull)) {
return Literal.of(e, null);
}

View File

@ -16,7 +16,7 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@ -50,20 +50,20 @@ public final class ReplaceAggregateAggExpressionWithEval extends OptimizerRules.
@Override
protected LogicalPlan rule(Aggregate aggregate) {
// build alias map
// an alias map for evaluatable grouping functions
AttributeMap.Builder<Expression> aliasesBuilder = AttributeMap.builder();
aggregate.forEachExpressionUp(Alias.class, a -> aliasesBuilder.put(a.toAttribute(), a.child()));
var aliases = aliasesBuilder.build();
// Build Categorize grouping functions map.
// Functions like BUCKET() shouldn't reach this point,
// as they are moved to an early EVAL by ReplaceAggregateNestedExpressionWithEval
Map<Categorize, Attribute> groupingAttributes = new HashMap<>();
// a function map for non-evaluatable grouping functions
Map<GroupingFunction.NonEvaluatableGroupingFunction, Attribute> nonEvalGroupingAttributes = new HashMap<>(
aggregate.groupings().size()
);
aggregate.forEachExpressionUp(Alias.class, a -> {
if (a.child() instanceof Categorize groupingFunction) {
groupingAttributes.put(groupingFunction, a.toAttribute());
if (a.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction groupingFunction) {
nonEvalGroupingAttributes.put(groupingFunction, a.toAttribute());
} else {
aliasesBuilder.put(a.toAttribute(), a.child());
}
});
var aliases = aliasesBuilder.build();
// break down each aggregate into AggregateFunction and/or grouping key
// preserve the projection at the end
@ -123,8 +123,11 @@ public final class ReplaceAggregateAggExpressionWithEval extends OptimizerRules.
return alias.toAttribute();
});
// replace grouping functions with their references
aggExpression = aggExpression.transformUp(Categorize.class, groupingAttributes::get);
// replace non-evaluatable grouping functions with their references
aggExpression = aggExpression.transformUp(
GroupingFunction.NonEvaluatableGroupingFunction.class,
nonEvalGroupingAttributes::get
);
Alias alias = as.replaceChild(aggExpression);
newEvals.add(alias);
@ -152,7 +155,7 @@ public final class ReplaceAggregateAggExpressionWithEval extends OptimizerRules.
return plan;
}
static String syntheticName(Expression expression, Expression af, int counter) {
private static String syntheticName(Expression expression, Expression af, int counter) {
return TemporaryNameUtils.temporaryName(expression, af, counter);
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@ -44,30 +43,27 @@ public final class ReplaceAggregateNestedExpressionWithEval extends OptimizerRul
List<Expression> newGroupings = new ArrayList<>(aggregate.groupings());
boolean groupingChanged = false;
// start with the groupings since the aggs might duplicate it
// start with the groupings since the aggs might reuse/reference them
for (int i = 0, s = newGroupings.size(); i < s; i++) {
Expression g = newGroupings.get(i);
// Move the alias into an eval and replace it with its attribute.
// Exception: Categorize is internal to the aggregation and remains in the groupings. We move its child expression into an eval.
if (g instanceof Alias as) {
if (as.child() instanceof Categorize cat) {
// For Categorize grouping function, we only move the child expression into an eval
if (cat.field() instanceof Attribute == false) {
Expression asChild = as.child();
// for non-evaluable grouping functions, replace their nested expressions with attributes and extract the expression out
// into an eval (added later below)
if (asChild instanceof GroupingFunction.NonEvaluatableGroupingFunction gf) {
Expression newGroupingFunction = transformNonEvaluatableGroupingFunction(gf, evals);
if (newGroupingFunction != gf) {
groupingChanged = true;
var fieldAs = new Alias(as.source(), as.name(), cat.field(), null, true);
var fieldAttr = fieldAs.toAttribute();
evals.add(fieldAs);
evalNames.put(fieldAs.name(), fieldAttr);
Categorize replacement = cat.replaceChildren(List.of(fieldAttr));
newGroupings.set(i, as.replaceChild(replacement));
newGroupings.set(i, as.replaceChild(newGroupingFunction));
}
} else {
// Move the alias into an eval and replace it with its attribute.
groupingChanged = true;
var attr = as.toAttribute();
evals.add(as);
evalNames.put(as.name(), attr);
newGroupings.set(i, attr);
if (as.child() instanceof GroupingFunction gf) {
if (asChild instanceof GroupingFunction.EvaluatableGroupingFunction gf) {
groupingAttributes.put(gf, attr);
}
}
@ -91,17 +87,7 @@ public final class ReplaceAggregateNestedExpressionWithEval extends OptimizerRul
// if the child is a nested expression
Expression child = as.child();
// do not replace nested aggregates
if (child instanceof AggregateFunction af) {
Holder<Boolean> foundNestedAggs = new Holder<>(Boolean.FALSE);
af.children().forEach(e -> e.forEachDown(AggregateFunction.class, unused -> foundNestedAggs.set(Boolean.TRUE)));
if (foundNestedAggs.get()) {
return as;
}
}
// shortcut for common scenario
if (child instanceof AggregateFunction af && af.field() instanceof Attribute) {
if (child instanceof AggregateFunction af && skipOptimisingAgg(af)) {
return as;
}
@ -112,33 +98,13 @@ public final class ReplaceAggregateNestedExpressionWithEval extends OptimizerRul
return ref;
}
// 1. look for the aggregate function
var replaced = child.transformUp(AggregateFunction.class, af -> {
Expression result = af;
Expression field = af.field();
// 2. if the field is a nested expression (not attribute or literal), replace it
if (field instanceof Attribute == false && field.foldable() == false) {
// 3. create a new alias if one doesn't exist yet no reference
Attribute attr = expToAttribute.computeIfAbsent(field.canonical(), k -> {
Alias newAlias = new Alias(k.source(), syntheticName(k, af, counter[0]++), k, null, true);
evals.add(newAlias);
return newAlias.toAttribute();
});
aggsChanged.set(true);
// replace field with attribute
List<Expression> newChildren = new ArrayList<>(af.children());
newChildren.set(0, attr);
result = af.replaceChildren(newChildren);
}
return result;
});
// replace any grouping functions with their references pointing to the added synthetic eval
replaced = replaced.transformDown(GroupingFunction.class, gf -> {
// Categorize in aggs depends on the grouping result, not on an early eval
if (gf instanceof Categorize) {
return gf;
}
// look for the aggregate function
var replaced = child.transformUp(
AggregateFunction.class,
af -> transformAggregateFunction(af, expToAttribute, evals, counter, aggsChanged)
);
// replace any evaluatable grouping functions with their references pointing to the added synthetic eval
replaced = replaced.transformDown(GroupingFunction.EvaluatableGroupingFunction.class, gf -> {
aggsChanged.set(true);
// should never return null, as it's verified.
// but even if broken, the transform will fail safely; otoh, returning `gf` will fail later due to incorrect plan.
@ -159,10 +125,71 @@ public final class ReplaceAggregateNestedExpressionWithEval extends OptimizerRul
aggregate = aggregate.with(newEval, groupings, aggregates);
}
return (LogicalPlan) aggregate;
return aggregate;
}
static String syntheticName(Expression expression, AggregateFunction af, int counter) {
return TemporaryNameUtils.temporaryName(expression, af, counter);
private static Expression transformNonEvaluatableGroupingFunction(
GroupingFunction.NonEvaluatableGroupingFunction gf,
List<Alias> evals
) {
int counter = 0;
boolean childrenChanged = false;
List<Expression> newChildren = new ArrayList<>(gf.children().size());
for (Expression ex : gf.children()) {
if (ex instanceof Attribute == false) { // TODO: foldables shouldn't require eval'ing either
var alias = new Alias(ex.source(), syntheticName(ex, gf, counter++), ex, null, true);
evals.add(alias);
newChildren.add(alias.toAttribute());
childrenChanged = true;
} else {
newChildren.add(ex);
}
}
return childrenChanged ? gf.replaceChildren(newChildren) : gf;
}
private static boolean skipOptimisingAgg(AggregateFunction af) {
// shortcut for the common scenario
if (af.field() instanceof Attribute) {
return true;
}
// do not replace nested aggregates
Holder<Boolean> foundNestedAggs = new Holder<>(Boolean.FALSE);
af.field().forEachDown(AggregateFunction.class, unused -> foundNestedAggs.set(Boolean.TRUE));
return foundNestedAggs.get();
}
private static Expression transformAggregateFunction(
AggregateFunction af,
Map<Expression, Attribute> expToAttribute,
List<Alias> evals,
int[] counter,
Holder<Boolean> aggsChanged
) {
Expression result = af;
Expression field = af.field();
// if the field is a nested expression (not attribute or literal), replace it
if (field instanceof Attribute == false && field.foldable() == false) {
// create a new alias if one doesn't exist yet
Attribute attr = expToAttribute.computeIfAbsent(field.canonical(), k -> {
Alias newAlias = new Alias(k.source(), syntheticName(k, af, counter[0]++), k, null, true);
evals.add(newAlias);
return newAlias.toAttribute();
});
aggsChanged.set(true);
// replace field with attribute
List<Expression> newChildren = new ArrayList<>(af.children());
newChildren.set(0, attr);
result = af.replaceChildren(newChildren);
}
return result;
}
private static String syntheticName(Expression expression, Expression func, int counter) {
return TemporaryNameUtils.temporaryName(expression, func, counter);
}
}

View File

@ -4413,11 +4413,11 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
/**
* Expects
* Limit[1000[INTEGER]]
* \_Aggregate[STANDARD,[CATEGORIZE(CATEGORIZE(CONCAT(first_name, "abc")){r$}#18) AS CATEGORIZE(CONCAT(first_name, "abc"))],[CO
* UNT(salary{f}#13,true[BOOLEAN]) AS c, CATEGORIZE(CONCAT(first_name, "abc")){r}#3]]
* \_Eval[[CONCAT(first_name{f}#9,[61 62 63][KEYWORD]) AS CATEGORIZE(CONCAT(first_name, "abc"))]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
* Limit[1000[INTEGER],false]
* \_Aggregate[[CATEGORIZE($$CONCAT(first_na>$CATEGORIZE(CONC>$0{r$}#1590) AS CATEGORIZE(CONCAT(first_name, "abc"))],[COUNT(sa
* lary{f}#1584,true[BOOLEAN]) AS c, CATEGORIZE(CONCAT(first_name, "abc")){r}#1574]]
* \_Eval[[CONCAT(first_name{f}#1580,[61 62 63][KEYWORD]) AS $$CONCAT(first_na>$CATEGORIZE(CONC>$0]]
* \_EsRelation[test][_meta_field{f}#1585, emp_no{f}#1579, first_name{f}#..]
*/
public void testNestedExpressionsInGroupsWithCategorize() {
var plan = optimizedPlan("""
@ -4438,10 +4438,10 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var evalFieldAlias = as(eval.fields().get(0), Alias.class);
var evalField = as(evalFieldAlias.child(), Concat.class);
assertThat(evalFieldAlias.name(), is("CATEGORIZE(CONCAT(first_name, \"abc\"))"));
assertThat(evalFieldAlias.name(), is("$$CONCAT(first_na>$CATEGORIZE(CONC>$0"));
assertThat(categorize.field(), is(evalFieldAlias.toAttribute()));
assertThat(evalField.source().text(), is("CONCAT(first_name, \"abc\")"));
assertThat(categorizeAlias.source(), is(evalFieldAlias.source()));
assertThat(categorizeAlias.source(), is(categorize.source()));
}
/**