ESQL: Dependency check for binary plans (#118326)

Make the dependency checker for query plans take into account binary plans and make sure that fields required from the left hand side are actually obtained from there (and analogously for the right).
This commit is contained in:
Alexander Spies 2024-12-13 11:38:53 +01:00 committed by GitHub
parent 67e3302bb4
commit 140d88c59a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 201 additions and 34 deletions

View File

@ -8,4 +8,4 @@ Groups text messages into categories of similarly formatted text values.
* can't be used within other expressions
* can't be used with multiple groupings
* can't be used or referenced within aggregations
* can't be used or referenced within aggregate functions

View File

@ -78,7 +78,7 @@
}
],
"examples" : [
"from books \n| where term(author, \"gabriel\") \n| keep book_no, title\n| limit 3;"
"FROM books \n| WHERE TERM(author, \"gabriel\") \n| KEEP book_no, title\n| LIMIT 3;"
],
"preview" : true,
"snapshot_only" : true

View File

@ -6,8 +6,8 @@ This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../READ
Performs a Term query on the specified field. Returns true if the provided term matches the row.
```
from books
| where term(author, "gabriel")
| keep book_no, title
| limit 3;
FROM books
| WHERE TERM(author, "gabriel")
| KEEP book_no, title
| LIMIT 3;
```

View File

@ -25,6 +25,9 @@ public class EsIndex implements Writeable {
private final Map<String, EsField> mapping;
private final Map<String, IndexMode> indexNameWithModes;
/**
* Intended for tests. Returns an index with an empty index mode map.
*/
public EsIndex(String name, Map<String, EsField> mapping) {
this(name, mapping, Map.of());
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
public final class LogicalVerifier {
private static final PlanConsistencyChecker<LogicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
private LogicalVerifier() {}
@ -25,7 +24,7 @@ public final class LogicalVerifier {
Failures dependencyFailures = new Failures();
plan.forEachUp(p -> {
DEPENDENCY_CHECK.checkPlan(p, dependencyFailures);
PlanConsistencyChecker.checkPlan(p, dependencyFailures);
if (failures.hasFailures() == false) {
p.forEachExpression(ex -> {

View File

@ -13,7 +13,6 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@ -28,7 +27,6 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
public final class PhysicalVerifier {
public static final PhysicalVerifier INSTANCE = new PhysicalVerifier();
private static final PlanConsistencyChecker<PhysicalPlan> DEPENDENCY_CHECK = new PlanConsistencyChecker<>();
private PhysicalVerifier() {}
@ -44,11 +42,6 @@ public final class PhysicalVerifier {
}
plan.forEachDown(p -> {
if (p instanceof AggregateExec agg) {
var exclude = Expressions.references(agg.ordinalAttributes());
DEPENDENCY_CHECK.checkPlan(p, exclude, depFailures);
return;
}
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
@ -62,7 +55,7 @@ public final class PhysicalVerifier {
);
}
}
DEPENDENCY_CHECK.checkPlan(p, depFailures);
PlanConsistencyChecker.checkPlan(p, depFailures);
});
if (depFailures.hasFailures()) {

View File

@ -12,31 +12,42 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
public class PlanConsistencyChecker<P extends QueryPlan<P>> {
public class PlanConsistencyChecker {
/**
* Check whether a single {@link QueryPlan} produces no duplicate attributes and its children provide all of its required
* {@link QueryPlan#references() references}. Otherwise, add
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
*/
public void checkPlan(P p, Failures failures) {
checkPlan(p, AttributeSet.EMPTY, failures);
}
public void checkPlan(P p, AttributeSet exclude, Failures failures) {
AttributeSet refs = p.references();
AttributeSet input = p.inputSet();
AttributeSet missing = refs.subtract(input).subtract(exclude);
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
// just any child (and analogously for the right child).
if (missing.isEmpty() == false) {
failures.add(fail(p, "Plan [{}] optimized incorrectly due to missing references {}", p.nodeString(), missing));
public static void checkPlan(QueryPlan<?> p, Failures failures) {
if (p instanceof BinaryPlan binaryPlan) {
checkMissingBinary(
p,
binaryPlan.leftReferences(),
binaryPlan.left().outputSet(),
binaryPlan.rightReferences(),
binaryPlan.right().outputSet(),
failures
);
} else if (p instanceof BinaryExec binaryExec) {
checkMissingBinary(
p,
binaryExec.leftReferences(),
binaryExec.left().outputSet(),
binaryExec.rightReferences(),
binaryExec.right().outputSet(),
failures
);
} else {
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
}
Set<String> outputAttributeNames = new HashSet<>();
@ -49,4 +60,29 @@ public class PlanConsistencyChecker<P extends QueryPlan<P>> {
}
}
}
private static void checkMissingBinary(
QueryPlan<?> plan,
AttributeSet leftReferences,
AttributeSet leftInput,
AttributeSet rightReferences,
AttributeSet rightInput,
Failures failures
) {
checkMissing(plan, leftReferences, leftInput, "missing references from left hand side", failures);
checkMissing(plan, rightReferences, rightInput, "missing references from right hand side", failures);
}
private static void checkMissing(
QueryPlan<?> plan,
AttributeSet references,
AttributeSet input,
String detailErrorMessage,
Failures failures
) {
AttributeSet missing = references.subtract(input);
if (missing.isEmpty() == false) {
failures.add(fail(plan, "Plan [{}] optimized incorrectly due to {} {}", plan.nodeString(), detailErrorMessage, missing));
}
}
}

View File

@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;
import java.util.Arrays;
@ -30,6 +31,10 @@ public abstract class BinaryPlan extends LogicalPlan {
return right;
}
public abstract AttributeSet leftReferences();
public abstract AttributeSet rightReferences();
@Override
public final BinaryPlan replaceChildren(List<LogicalPlan> newChildren) {
return replaceChildren(newChildren.get(0), newChildren.get(1));

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
@ -97,6 +98,16 @@ public class Join extends BinaryPlan {
return lazyOutput;
}
@Override
public AttributeSet leftReferences() {
return Expressions.references(config().leftFields());
}
@Override
public AttributeSet rightReferences() {
return Expressions.references(config().rightFields());
}
public List<Attribute> rightOutputFields() {
AttributeSet leftInputs = left().outputSet();

View File

@ -184,7 +184,9 @@ public class AggregateExec extends UnaryExec implements EstimatesRowSize {
@Override
protected AttributeSet computeReferences() {
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings);
return mode.isInputPartial()
? new AttributeSet(intermediateAttributes)
: Aggregate.computeReferences(aggregates, groupings).subtract(new AttributeSet(ordinalAttributes()));
}
/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */

View File

@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.plan.physical;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;
import java.io.IOException;
@ -40,6 +41,10 @@ public abstract class BinaryExec extends PhysicalPlan {
return right;
}
public abstract AttributeSet leftReferences();
public abstract AttributeSet rightReferences();
@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);

View File

@ -119,6 +119,16 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
return Expressions.references(leftFields);
}
@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}
@Override
public AttributeSet rightReferences() {
return Expressions.references(rightFields);
}
@Override
public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);

View File

@ -119,6 +119,21 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize {
return Expressions.references(leftFields);
}
@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}
@Override
public AttributeSet rightReferences() {
// TODO: currently it's hard coded that we add all fields from the lookup index. But the output we "officially" get from the right
// hand side is inconsistent:
// - After logical optimization, there's a FragmentExec with an EsRelation on the right hand side with all the fields.
// - After local physical optimization, there's just an EsQueryExec here, with no fields other than _doc mentioned and we don't
// insert field extractions in the plan, either.
return AttributeSet.EMPTY;
}
@Override
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.analysis;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
@ -104,6 +105,11 @@ public final class AnalyzerTestUtils {
return analyzer.analyze(plan);
}
public static IndexResolution loadMapping(String resource, String indexName, IndexMode indexMode) {
EsIndex test = new EsIndex(indexName, EsqlTestUtils.loadMapping(resource), Map.of(indexName, indexMode));
return IndexResolution.valid(test);
}
public static IndexResolution loadMapping(String resource, String indexName) {
EsIndex test = new EsIndex(indexName, EsqlTestUtils.loadMapping(resource));
return IndexResolution.valid(test);
@ -118,7 +124,7 @@ public final class AnalyzerTestUtils {
}
public static IndexResolution defaultLookupResolution() {
return loadMapping("mapping-languages.json", "languages_lookup");
return loadMapping("mapping-languages.json", "languages_lookup", IndexMode.LOOKUP);
}
public static EnrichResolution defaultEnrichResolution() {

View File

@ -149,6 +149,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.referenceAttribute;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.expression.Literal.NULL;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
@ -221,7 +222,13 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
EsIndex test = new EsIndex("test", mapping, Map.of("test", IndexMode.STANDARD));
IndexResolution getIndexResult = IndexResolution.valid(test);
analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, enrichResolution),
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
enrichResolution
),
TEST_VERIFIER
);
@ -4896,6 +4903,26 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
}
public void testPlanSanityCheckWithBinaryPlans() throws Exception {
var plan = optimizedPlan("""
FROM test
| RENAME languages AS language_code
| LOOKUP JOIN languages_lookup ON language_code
""");
var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
var join = as(limit.child(), Join.class);
var joinWithInvalidLeftPlan = join.replaceChildren(join.right(), join.right());
IllegalStateException e = expectThrows(IllegalStateException.class, () -> logicalOptimizer.optimize(joinWithInvalidLeftPlan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from left hand side [language_code"));
var joinWithInvalidRightPlan = join.replaceChildren(join.left(), join.left());
e = expectThrows(IllegalStateException.class, () -> logicalOptimizer.optimize(joinWithInvalidRightPlan));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from right hand side [language_code"));
}
// https://github.com/elastic/elasticsearch/issues/104995
public void testNoWrongIsNotNullPruning() {
var plan = optimizedPlan("""

View File

@ -115,6 +115,7 @@ import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
@ -155,6 +156,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.SerializationTestUtils.assertSerialization;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.name;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.names;
import static org.elasticsearch.xpack.esql.core.expression.function.scalar.FunctionTestUtils.l;
@ -281,16 +283,30 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
String indexName,
String mappingFileName,
EsqlFunctionRegistry functionRegistry,
IndexResolution lookupResolution,
EnrichResolution enrichResolution,
SearchStats stats
) {
Map<String, EsField> mapping = loadMapping(mappingFileName);
EsIndex index = new EsIndex(indexName, mapping, Map.of("test", IndexMode.STANDARD));
IndexResolution getIndexResult = IndexResolution.valid(index);
Analyzer analyzer = new Analyzer(new AnalyzerContext(config, functionRegistry, getIndexResult, enrichResolution), TEST_VERIFIER);
Analyzer analyzer = new Analyzer(
new AnalyzerContext(config, functionRegistry, getIndexResult, lookupResolution, enrichResolution),
TEST_VERIFIER
);
return new TestDataSource(mapping, index, analyzer, stats);
}
TestDataSource makeTestDataSource(
String indexName,
String mappingFileName,
EsqlFunctionRegistry functionRegistry,
EnrichResolution enrichResolution,
SearchStats stats
) {
return makeTestDataSource(indexName, mappingFileName, functionRegistry, defaultLookupResolution(), enrichResolution, stats);
}
TestDataSource makeTestDataSource(
String indexName,
String mappingFileName,
@ -2312,6 +2328,39 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
assertThat(e.getMessage(), containsString(" > 10[INTEGER]]] optimized incorrectly due to missing references [emp_no{f}#"));
}
public void testVerifierOnMissingReferencesWithBinaryPlans() throws Exception {
// Do not assert serialization:
// This will have a LookupJoinExec, which is not serializable because it doesn't leave the coordinator.
var plan = physicalPlan("""
FROM test
| RENAME languages AS language_code
| SORT language_code
| LOOKUP JOIN languages_lookup ON language_code
""", testData, false);
var planWithInvalidJoinLeftSide = plan.transformUp(LookupJoinExec.class, join -> join.replaceChildren(join.right(), join.right()));
var e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinLeftSide));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from left hand side [language_code"));
var planWithInvalidJoinRightSide = plan.transformUp(
LookupJoinExec.class,
// LookupJoinExec.rightReferences() is currently EMPTY (hack); use a HashJoinExec instead.
join -> new HashJoinExec(
join.source(),
join.left(),
join.left(),
join.leftFields(),
join.leftFields(),
join.rightFields(),
join.output()
)
);
e = expectThrows(IllegalStateException.class, () -> physicalPlanOptimizer.verify(planWithInvalidJoinRightSide));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references from right hand side [language_code"));
}
public void testVerifierOnDuplicateOutputAttributes() {
var plan = physicalPlan("""
from test
@ -6863,11 +6912,17 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
}
private PhysicalPlan physicalPlan(String query, TestDataSource dataSource) {
return physicalPlan(query, dataSource, true);
}
private PhysicalPlan physicalPlan(String query, TestDataSource dataSource, boolean assertSerialization) {
var logical = logicalOptimizer.optimize(dataSource.analyzer.analyze(parser.createStatement(query)));
// System.out.println("Logical\n" + logical);
var physical = mapper.map(logical);
// System.out.println(physical);
assertSerialization(physical);
if (assertSerialization) {
assertSerialization(physical);
}
return physical;
}