ESQL: Add optimization to purge join on null merge key (#127583)

This adds a new logical optimization rule to purge a Join in case the
merge key(s) are null. The null detection is based on recognizing a tree
pattern where the join sits atop a project and/or eval (possibly a few
nodes deep) which contains a reference to a `null`, reference which
matches the join key.

It works at coordinator planning level, but it's most useful locally,
after insertions of `nulls` in the plan on detecting missing fields.

The Join is substituted with a projection with the same attributes as
the join, atop an eval with all join's right fields aliased to null.

Closes #125577.
This commit is contained in:
Bogdan Pintea 2025-05-27 15:38:18 +02:00 committed by GitHub
parent 52bc94e295
commit 21fe40a9b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 433 additions and 61 deletions

View File

@ -0,0 +1,6 @@
pr: 127583
summary: Add optimization to purge join on null merge key
area: ES|QL
type: enhancement
issues:
- 125577

View File

@ -138,7 +138,7 @@ public final class Alias extends NamedExpression {
@Override
public String nodeString() {
return child.nodeString() + " AS " + name();
return child.nodeString() + " AS " + name() + "#" + id();
}
/**

View File

@ -4561,3 +4561,22 @@ language_code_float:double | language_code_double:double | language_name:keyword
2.147483648E9 | 2.147483646E9 | max_int_minus_1
2.147483648E9 | 2.147483647E9 | max_int
;
nullifiedJoinKeyToPurgeTheJoin
required_capability: join_lookup_v12
FROM employees
| RENAME languages AS language_code
| SORT emp_no, language_code
| LIMIT 4
| EVAL language_code = TO_INTEGER(NULL)
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
;
emp_no:integer | language_code:integer | language_name:keyword
10001 |null |null
10002 |null |null
10003 |null |null
10004 |null |null
;

View File

@ -65,6 +65,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateA
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneLeftJoinOnNullMatchingField;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
@ -201,7 +202,8 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
new PushDownEnrich(),
new PushDownAndCombineOrderBy(),
new PruneRedundantOrderBy(),
new PruneRedundantSortClauses()
new PruneRedundantSortClauses(),
new PruneLeftJoinOnNullMatchingField()
);
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.optimizer.rules;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
public final class RuleUtils {
private RuleUtils() {}
/**
* Returns a tuple of two lists:
* 1. A list of aliases to null literals for those data types in the {@param outputAttributes} that {@param shouldBeReplaced}.
* 2. A list of named expressions where attributes that match the predicate are replaced with their corresponding null alias.
*
* @param outputAttributes The original output attributes.
* @param shouldBeReplaced A predicate to determine which attributes should be replaced with null aliases.
*/
public static Tuple<List<Alias>, List<NamedExpression>> aliasedNulls(
List<Attribute> outputAttributes,
Predicate<Attribute> shouldBeReplaced
) {
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
List<NamedExpression> newProjections = new ArrayList<>(outputAttributes.size());
for (Attribute attr : outputAttributes) {
NamedExpression projection;
if (shouldBeReplaced.test(attr)) {
DataType dt = attr.dataType();
Alias nullAlias = nullLiterals.get(dt);
// save the first field as null (per datatype)
if (nullAlias == null) {
// Keep the same id so downstream query plans don't need updating
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
// layouts due to a duplicate name id.
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
Alias alias = new Alias(attr.source(), attr.name(), Literal.of(attr, null), attr.id());
nullLiterals.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it since this avoids creating field copies
else {
projection = new Alias(attr.source(), attr.name(), nullAlias.toAttribute(), attr.id());
}
} else {
projection = attr;
}
newProjections.add(projection);
}
return new Tuple<>(new ArrayList<>(nullLiterals.values()), newProjections);
}
/**
* Collects references to foldables from the given logical plan, returning an {@link AttributeMap} that maps
* foldable aliases to their corresponding literal values.
*
* @param plan The logical plan to analyze.
* @param ctx The optimizer context providing fold context.
* @return An {@link AttributeMap} containing foldable references and their literal values.
*/
public static AttributeMap<Expression> foldableReferences(LogicalPlan plan, LogicalOptimizerContext ctx) {
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, r -> collectRefsBuilder.build().resolve(r, r));
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
}
});
return collectRefsBuilder.build();
}
}

View File

@ -7,12 +7,11 @@
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@ -26,24 +25,8 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,
@Override
public LogicalPlan apply(LogicalPlan plan, LogicalOptimizerContext ctx) {
AttributeMap.Builder<Expression> collectRefsBuilder = AttributeMap.builder();
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefsBuilder.build().resolve(r, r);
// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefsBuilder.put(a.toAttribute(), Literal.of(ctx.foldCtx(), c));
}
});
if (collectRefsBuilder.isEmpty()) {
AttributeMap<Expression> collectRefs = RuleUtils.foldableReferences(plan, ctx);
if (collectRefs.isEmpty()) {
return plan;
}
@ -52,7 +35,7 @@ public final class PropagateEvalFoldables extends ParameterizedRule<LogicalPlan,
// TODO: also allow aggregates once aggs on constants are supported.
// C.f. https://github.com/elastic/elasticsearch/issues/100634
if (p instanceof Filter || p instanceof Eval) {
p = p.transformExpressionsOnly(ReferenceAttribute.class, replaceReference);
p = p.transformExpressionsOnly(ReferenceAttribute.class, r -> collectRefs.resolve(r, r));
}
return p;
});

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;
/**
* The rule checks if the join's performed on a field which is aliased to null (in type or value); if that's the case, it prunes the join,
* replacing it with an Eval - returning aliases to null for all the fields added in by the right side of the Join - plus a Project on top
* of it. The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is
* inserted due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceFieldWithConstantOrNull}.
*/
public class PruneLeftJoinOnNullMatchingField extends OptimizerRules.ParameterizedOptimizerRule<Join, LogicalOptimizerContext> {
public PruneLeftJoinOnNullMatchingField() {
super(OptimizerRules.TransformDirection.DOWN);
}
@Override
protected LogicalPlan rule(Join join, LogicalOptimizerContext ctx) {
LogicalPlan plan = join;
if (join.config().type() == LEFT) { // other types will have different replacement logic
AttributeMap<Expression> attributeMap = RuleUtils.foldableReferences(join, ctx);
for (var attr : AttributeSet.of(join.config().matchFields())) {
var resolved = attributeMap.resolve(attr);
if (resolved != null && isGuaranteedNull(resolved)) {
plan = replaceJoin(join);
break;
}
}
}
return plan;
}
private static LogicalPlan replaceJoin(Join join) {
var joinRightOutput = join.rightOutputFields();
// can be empty when the join key is null and the rest of the right side entries pruned (such as by an agg)
if (joinRightOutput.isEmpty()) {
return join.left();
}
var aliasedNulls = RuleUtils.aliasedNulls(joinRightOutput, a -> true);
var eval = new Eval(join.source(), join.left(), aliasedNulls.v1());
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliasedNulls.v2())));
}
}

View File

@ -7,18 +7,15 @@
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.rules.RuleUtils;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@ -29,7 +26,6 @@ import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -92,42 +88,18 @@ public class ReplaceFieldWithConstantOrNull extends ParameterizedRule<LogicalPla
// \_Eval[field1 = null, field3 = null]
// \_EsRelation[field1, field2, field3]
List<Attribute> relationOutput = relation.output();
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
for (int i = 0, size = relationOutput.size(); i < size; i++) {
Attribute attr = relationOutput.get(i);
NamedExpression projection;
if (attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false) {
DataType dt = f.dataType();
Alias nullAlias = nullLiterals.get(dt);
// save the first field as null (per datatype)
if (nullAlias == null) {
// Keep the same id so downstream query plans don't need updating
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
// layouts due to a duplicate name id.
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
nullLiterals.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it since this avoids creating field copies
else {
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
}
} else {
projection = attr;
}
newProjections.add(projection);
}
var aliasedNulls = RuleUtils.aliasedNulls(
relationOutput,
attr -> attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false
);
var nullLiterals = aliasedNulls.v1();
var newProjections = aliasedNulls.v2();
if (nullLiterals.size() == 0) {
return plan;
}
Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
Eval eval = new Eval(plan.source(), relation, nullLiterals);
// This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
return new Project(plan.source(), eval, newProjections);
}

View File

@ -70,8 +70,10 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@ -116,6 +118,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolutio
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
import static org.hamcrest.Matchers.contains;
@ -221,7 +224,14 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
IndexResolution getIndexResult = IndexResolution.valid(test);
return new Analyzer(
new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution, emptyInferenceResolution()),
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
enrichResolution,
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
);
}
@ -1394,6 +1404,174 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
assertThat(Expressions.names(fields), contains("_meta_field", "gender", "hire_date", "job", "job.raw", "languages", "long_noidx"));
}
/*
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[language_code{r}#6],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, language_code{r}#6],FINAL,[language_code{r}#6, $
* $c$count{r}#25, $$c$seen{r}#26],12]
* \_ExchangeExec[[language_code{r}#6, $$c$count{r}#25, $$c$seen{r}#26],true]
* \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, languages{r}#15 AS language_code#6],INITIAL,[langua
* ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* \_FieldExtractExec[emp_no{f}#12]<[],[]>
* \_EvalExec[[null[INTEGER] AS languages#15]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
*/
public void testMissingFieldsPurgesTheJoinLocally() {
var stats = EsqlTestUtils.statsForMissingField("languages");
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| lookup join languages_lookup ON language_code
| stats c = count(emp_no) by language_code
""", stats);
var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
}
/*
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],FINAL,[language_code{r}#7, $
* $c$count{r}#32, $$c$seen{r}#33],12]
* \_ExchangeExec[[language_code{r}#7, $$c$count{r}#32, $$c$seen{r}#33],true]
* \_AggregateExec[[language_code{r}#7],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#7],INITIAL,[language_code{r}#7,
* $$c$count{r}#34, $$c$seen{r}#35],12]
* \_GrokExec[first_name{f}#19,Parser[pattern=%{WORD:foo}, grok=org.elasticsearch.grok.Grok@75389ac1],[foo{r}#12]]
* \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
* \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
* \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
* \_EvalExec[[null[INTEGER] AS languages#21]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[112]
*/
public void testMissingFieldsPurgesTheJoinLocallyThroughCommands() {
var stats = EsqlTestUtils.statsForMissingField("languages");
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages, first_name
| rename languages AS language_code
| mv_expand emp_no
| grok first_name "%{WORD:foo}"
| lookup join languages_lookup ON language_code
| stats c = count(emp_no) by language_code
""", stats);
var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var grok = as(agg.child(), GrokExec.class);
var mvexpand = as(grok.child(), MvExpandExec.class);
var project = as(mvexpand.child(), ProjectExec.class);
var extract = as(project.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
}
/*
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],FINAL,[language_code{r}#12
* , $$c$count{r}#32, $$c$seen{r}#33],12]
* \_ExchangeExec[[language_code{r}#12, $$c$count{r}#32, $$c$seen{r}#33],true]
* \_AggregateExec[[language_code{r}#12],[COUNT(emp_no{r}#31,true[BOOLEAN]) AS c#17, language_code{r}#12],INITIAL,[language_code{r}#
* 12, $$c$count{r}#34, $$c$seen{r}#35],12]
* \_LookupJoinExec[[language_code{r}#12],[language_code{f}#29],[]]
* |_GrokExec[first_name{f}#19,Parser[pattern=%{NUMBER:language_code:int}, grok=org.elasticsearch.grok.Grok@764e5109],[languag
* e_code{r}#12]]
* | \_MvExpandExec[emp_no{f}#18,emp_no{r}#31]
* | \_ProjectExec[[emp_no{f}#18, languages{r}#21 AS language_code#7, first_name{f}#19]]
* | \_FieldExtractExec[emp_no{f}#18, first_name{f}#19]<[],[]>
* | \_EvalExec[[null[INTEGER] AS languages#21]]
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#36], limit[], sort[] estimatedRowSize[66]
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#37], limit[], sort[] estimatedRowSize[4]
*/
public void testMissingFieldsNotPurgingTheJoinLocally() {
var stats = EsqlTestUtils.statsForMissingField("languages");
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages, first_name
| rename languages AS language_code
| mv_expand emp_no
| grok first_name "%{NUMBER:language_code:int}" // this reassigns language_code
| lookup join languages_lookup ON language_code
| stats c = count(emp_no) by language_code
""", stats);
var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var join = as(agg.child(), LookupJoinExec.class);
var grok = as(join.left(), GrokExec.class);
var mvexpand = as(grok.child(), MvExpandExec.class);
var project = as(mvexpand.child(), ProjectExec.class);
var extract = as(project.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
var right = as(join.right(), EsQueryExec.class);
}
/*
* LimitExec[1000[INTEGER]]
* \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]
* |_LimitExec[1000[INTEGER]]
* | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language
* s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62]
* | \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true]
* | \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL,
* [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* | \_FieldExtractExec[emp_no{f}#12]<[],[]>
* | \_EvalExec[[null[INTEGER] AS languages#15]]
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4]
*/
public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() {
var stats = EsqlTestUtils.statsForMissingField("languages");
// same as the query above, but with the last two lines swapped, so that the join is no longer pushed to the data nodes
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| stats c = count(emp_no) by language_code
| lookup join languages_lookup ON language_code
""", stats);
var limit = as(plan, LimitExec.class);
var join = as(limit.child(), LookupJoinExec.class);
limit = as(join.left(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
assertThat(eval.fields().size(), is(1));
var alias = as(eval.fields().getFirst(), Alias.class);
assertThat(alias.name(), is("languages"));
var literal = as(alias.child(), Literal.class);
assertNull(literal.value());
var source = as(eval.child(), EsQueryExec.class);
assertThat(source.indexPattern(), is("test"));
assertThat(source.indexMode(), is(IndexMode.STANDARD));
source = as(join.right(), EsQueryExec.class);
assertThat(source.indexPattern(), is("languages_lookup"));
assertThat(source.indexMode(), is(IndexMode.LOOKUP));
}
/*
Checks that match filters are pushed down to Lucene when using no casting, for example:
WHERE first_name:"Anna")

View File

@ -2782,6 +2782,50 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var localRelation = as(limitBefore.child(), LocalRelation.class);
}
/*
* EsqlProject[[emp_no{f}#9, first_name{f}#10, languages{f}#12, language_code{r}#3, language_name{r}#22]]
* \_Eval[[null[INTEGER] AS language_code#3, null[KEYWORD] AS language_name#22]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
*/
public void testPruneJoinOnNullMatchingField() {
var plan = optimizedPlan("""
from test
| eval language_code = null::integer
| keep emp_no, first_name, languages, language_code
| lookup join languages_lookup on language_code
""");
var project = as(plan, Project.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "languages", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}
/*
* EsqlProject[[emp_no{f}#15, first_name{f}#16, my_null{r}#3 AS language_code#9, language_name{r}#27]]
* \_Eval[[null[INTEGER] AS my_null#3, null[KEYWORD] AS language_name#27]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
*/
public void testPruneJoinOnNullAssignedMatchingField() {
var plan = optimizedPlan("""
from test
| eval my_null = null::integer
| rename languages as language_code
| eval language_code = my_null
| lookup join languages_lookup on language_code
| keep emp_no, first_name, language_code, language_name
""");
var project = as(plan, EsqlProject.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}
private static List<String> orderNames(TopN topN) {
return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
}

View File

@ -7667,7 +7667,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
// The TopN needs an estimated row size for the planner to work
var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config);
plan = useDataNodePlan ? plans.v2() : plans.v1();
plan = PlannerUtils.localPlan(List.of(), config, FoldContext.small(), plan);
plan = PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS);
ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10);
LocalExecutionPlanner planner = new LocalExecutionPlanner(
"test",