ESQL: INLINESTATS implementation with multiple LogicalPlan updates (#128917)
Part of https://github.com/elastic/elasticsearch/issues/124715 and similar to https://github.com/elastic/elasticsearch/pull/128476. Different from https://github.com/elastic/elasticsearch/pull/128476 in that it takes a "LogicalPlan" approach to running a sub-query, integrating its result back in the "main" LogicalPlan and continuing running the query.
This commit is contained in:
parent
3025f6cb27
commit
c9a4206c00
|
@ -0,0 +1,6 @@
|
|||
pr: 128917
|
||||
summary: Adopt a "LogicalPlan" approach to running multiple sub-queries (with INLINESTATS
|
||||
so far)
|
||||
area: ES|QL
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -333,6 +333,7 @@ public class TransportVersions {
|
|||
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00);
|
||||
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00);
|
||||
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00);
|
||||
public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
|
|
|
@ -49,7 +49,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
|
|||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V9;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V8;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_LOOKUP_V12;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.JOIN_PLANNING_V1;
|
||||
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.METADATA_FIELDS_REMOTE_TEST;
|
||||
|
@ -128,7 +128,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
|
|||
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
|
||||
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
|
||||
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
|
||||
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V7.capabilityName()));
|
||||
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V8.capabilityName()));
|
||||
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
|
||||
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
|
||||
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Enrich;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.Limit;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
|
||||
|
@ -446,7 +447,7 @@ public final class EsqlTestUtils {
|
|||
}
|
||||
|
||||
public static LogicalPlan emptySource() {
|
||||
return new LocalRelation(Source.EMPTY, emptyList(), LocalSupplier.EMPTY);
|
||||
return new LocalRelation(Source.EMPTY, emptyList(), EmptyLocalSupplier.EMPTY);
|
||||
}
|
||||
|
||||
public static LogicalPlan localSource(BlockFactory blockFactory, List<Attribute> fields, List<Object> row) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -906,7 +906,7 @@ public class EsqlCapabilities {
|
|||
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
|
||||
* were refactored.
|
||||
*/
|
||||
INLINESTATS_V7(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
|
||||
INLINESTATS_V8(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
|
||||
|
||||
/**
|
||||
* Support partial_results
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
|
||||
|
@ -36,7 +37,7 @@ public class PropagateEmptyRelation extends OptimizerRules.ParameterizedOptimize
|
|||
@Override
|
||||
protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
|
||||
LogicalPlan p = plan;
|
||||
if (plan.child() instanceof LocalRelation local && local.supplier() == LocalSupplier.EMPTY) {
|
||||
if (plan.child() instanceof LocalRelation local && local.supplier() == EmptyLocalSupplier.EMPTY) {
|
||||
// only care about non-grouped aggs might return something (count)
|
||||
if (plan instanceof Aggregate agg && agg.groupings().isEmpty()) {
|
||||
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.Map;
|
|||
|
||||
/**
|
||||
* Replace any evaluation from the inlined aggregation side (right side) to the left side (source) to perform the matching.
|
||||
* In INLINE m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b.
|
||||
* In INLINESTATS m = MIN(x) BY a + b the right side contains STATS m = MIN(X) BY a + b.
|
||||
* As the grouping key is used to perform the join, the evaluation required for creating it has to be copied to the left side
|
||||
* as well.
|
||||
*/
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.Fork;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.Limit;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
|
||||
|
@ -39,15 +40,18 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
public LogicalPlan apply(LogicalPlan plan) {
|
||||
// track used references
|
||||
var used = plan.outputSet().asBuilder();
|
||||
// track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the
|
||||
// inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes
|
||||
var inlineJoinRightOutput = new ArrayList<Attribute>();
|
||||
Holder<Boolean> forkPresent = new Holder<>(false);
|
||||
|
||||
// while going top-to-bottom (upstream)
|
||||
var pl = plan.transformDown(p -> {
|
||||
// Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends
|
||||
// first into the left side, adding all kinds of attributes to the `used` set, and then descends into the right side - even
|
||||
// though the `used` set will contain stuff only used in the left hand side. That's because any attribute that is used in the
|
||||
// left hand side must have been created in the left side as well. Even field attributes belonging to the same index fields will
|
||||
// have different name ids in the left and right hand sides - as in the extreme example
|
||||
// Note: It is NOT required to do anything special for binary plans like JOINs, except INLINESTATS. It is perfectly fine that
|
||||
// transformDown descends first into the left side, adding all kinds of attributes to the `used` set, and then descends into
|
||||
// the right side - even though the `used` set will contain stuff only used in the left hand side. That's because any attribute
|
||||
// that is used in the left hand side must have been created in the left side as well. Even field attributes belonging to the
|
||||
// same index fields will have different name ids in the left and right hand sides - as in the extreme example
|
||||
// `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`.
|
||||
|
||||
// skip nodes that simply pass the input through
|
||||
|
@ -63,6 +67,11 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
return p;
|
||||
}
|
||||
|
||||
// TODO: INLINESTATS unit testing for tracking this set
|
||||
if (p instanceof InlineJoin ij) {
|
||||
inlineJoinRightOutput.addAll(ij.right().outputSet());
|
||||
}
|
||||
|
||||
// remember used
|
||||
boolean recheck;
|
||||
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate
|
||||
|
@ -70,7 +79,8 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
do {
|
||||
recheck = false;
|
||||
if (p instanceof Aggregate aggregate) {
|
||||
var remaining = removeUnused(aggregate.aggregates(), used);
|
||||
// TODO: INLINESTATS https://github.com/elastic/elasticsearch/pull/128917#discussion_r2175162099
|
||||
var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput);
|
||||
|
||||
if (remaining != null) {
|
||||
if (remaining.isEmpty()) {
|
||||
|
@ -96,8 +106,19 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
p = aggregate.with(aggregate.groupings(), remaining);
|
||||
}
|
||||
}
|
||||
} else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal
|
||||
var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput);
|
||||
if (remaining != null) {
|
||||
if (remaining.isEmpty()) {
|
||||
// remove the InlineJoin altogether
|
||||
p = ij.left();
|
||||
recheck = true;
|
||||
}
|
||||
// TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs
|
||||
// if they will not be used anyway
|
||||
}
|
||||
} else if (p instanceof Eval eval) {
|
||||
var remaining = removeUnused(eval.fields(), used);
|
||||
var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput);
|
||||
// no fields, no eval
|
||||
if (remaining != null) {
|
||||
if (remaining.isEmpty()) {
|
||||
|
@ -111,7 +132,7 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
// Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
|
||||
// However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index,
|
||||
// it works differently as we extract all fields (other than the join key) that the EsRelation has.
|
||||
var remaining = removeUnused(esr.output(), used);
|
||||
var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput);
|
||||
if (remaining != null) {
|
||||
p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining);
|
||||
}
|
||||
|
@ -131,14 +152,15 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
|
|||
* Prunes attributes from the list not found in the given set.
|
||||
* Returns null if no changed occurred.
|
||||
*/
|
||||
private static <N extends NamedExpression> List<N> removeUnused(List<N> named, AttributeSet.Builder used) {
|
||||
private static <N extends NamedExpression> List<N> removeUnused(List<N> named, AttributeSet.Builder used, List<Attribute> exceptions) {
|
||||
var clone = new ArrayList<>(named);
|
||||
var it = clone.listIterator(clone.size());
|
||||
|
||||
// due to Eval, go in reverse
|
||||
while (it.hasPrevious()) {
|
||||
N prev = it.previous();
|
||||
if (used.contains(prev.toAttribute()) == false) {
|
||||
var attr = prev.toAttribute();
|
||||
if (used.contains(attr) == false && exceptions.contains(attr) == false) {
|
||||
it.remove();
|
||||
} else {
|
||||
used.addAll(prev.references());
|
||||
|
|
|
@ -9,13 +9,13 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical;
|
|||
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
|
||||
public final class PruneEmptyPlans extends OptimizerRules.OptimizerRule<UnaryPlan> {
|
||||
|
||||
public static LogicalPlan skipPlan(UnaryPlan plan) {
|
||||
return new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY);
|
||||
return new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.elasticsearch.compute.data.BlockUtils;
|
|||
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.Row;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -29,6 +29,6 @@ public final class ReplaceRowAsLocalRelation extends OptimizerRules.Parameterize
|
|||
List<Object> values = new ArrayList<>(fields.size());
|
||||
fields.forEach(f -> values.add(f.child().fold(context.foldCtx())));
|
||||
var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
|
||||
return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks));
|
||||
return new LocalRelation(row.source(), row.output(), new CopyingLocalSupplier(blocks));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,13 +9,13 @@ package org.elasticsearch.xpack.esql.optimizer.rules.logical;
|
|||
|
||||
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
|
||||
public final class SkipQueryOnEmptyMappings extends OptimizerRules.OptimizerRule<EsRelation> {
|
||||
|
||||
@Override
|
||||
protected LogicalPlan rule(EsRelation plan) {
|
||||
return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), LocalSupplier.EMPTY) : plan;
|
||||
return plan.concreteIndices().isEmpty() ? new LocalRelation(plan.source(), plan.output(), EmptyLocalSupplier.EMPTY) : plan;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,10 @@ import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
|
||||
|
@ -65,6 +68,7 @@ public class PlanWritables {
|
|||
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
|
||||
entries.addAll(logical());
|
||||
entries.addAll(physical());
|
||||
entries.addAll(others());
|
||||
return entries;
|
||||
}
|
||||
|
||||
|
@ -124,4 +128,8 @@ public class PlanWritables {
|
|||
TopNExec.ENTRY
|
||||
);
|
||||
}
|
||||
|
||||
public static List<NamedWriteableRegistry.Entry> others() {
|
||||
return List.of(CopyingLocalSupplier.ENTRY, ImmediateLocalSupplier.ENTRY, EmptyLocalSupplier.ENTRY);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
|
||||
|
||||
/**
|
||||
|
@ -101,6 +102,9 @@ public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateL
|
|||
for (Expression g : groupings) {
|
||||
namedGroupings.add(Expressions.attribute(g));
|
||||
}
|
||||
// last named grouping wins, just like it happens for regular STATS
|
||||
// ie BY x = field_1, x = field_2, the grouping is actually performed on second x (field_2)
|
||||
namedGroupings = mergeOutputAttributes(namedGroupings, emptyList());
|
||||
|
||||
List<Attribute> leftFields = new ArrayList<>(groupings.size());
|
||||
List<Attribute> rightFields = new ArrayList<>(groupings.size());
|
||||
|
|
|
@ -13,10 +13,10 @@ import org.elasticsearch.compute.data.Block;
|
|||
import org.elasticsearch.compute.data.BlockUtils;
|
||||
import org.elasticsearch.xpack.esql.core.expression.Alias;
|
||||
import org.elasticsearch.xpack.esql.core.expression.Attribute;
|
||||
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
|
||||
import org.elasticsearch.xpack.esql.core.expression.Literal;
|
||||
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.esql.core.tree.Source;
|
||||
import org.elasticsearch.xpack.esql.core.util.Holder;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.Eval;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
|
@ -56,16 +56,9 @@ public class InlineJoin extends Join {
|
|||
return sourcePlan.replaceChild(new StubRelation(sourcePlan.source(), target.output()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the stubbed source with the actual source.
|
||||
*/
|
||||
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
|
||||
return stubbed.transformUp(StubRelation.class, stubRelation -> source);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: perform better planning
|
||||
* Keep the join in place or replace it with a projection in case no grouping is necessary.
|
||||
* Keep the join in place or replace it with an Eval in case no grouping is necessary.
|
||||
*/
|
||||
public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
|
||||
if (target.config().matchFields().isEmpty()) {
|
||||
|
@ -82,6 +75,70 @@ public class InlineJoin extends Join {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the stubbed source with the actual source.
|
||||
* NOTE: this will replace all {@link StubRelation}s found with the source and the method is meant to be used to replace one node only
|
||||
* when being called on a plan that has only ONE StubRelation in it.
|
||||
*/
|
||||
public static LogicalPlan replaceStub(LogicalPlan source, LogicalPlan stubbed) {
|
||||
// here we could have used stubbed.transformUp(StubRelation.class, stubRelation -> source)
|
||||
// but transformUp skips changing a node if its transformed variant is equal to its original variant.
|
||||
// A StubRelation can contain in its output ReferenceAttributes which do not use NameIds for equality, but only names and
|
||||
// two ReferenceAttributes with the same name are equal and the transformation will not be applied.
|
||||
return stubbed.transformUp(UnaryPlan.class, up -> {
|
||||
if (up.child() instanceof StubRelation) {
|
||||
return up.replaceChild(source);
|
||||
}
|
||||
return up;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param stubReplacedSubPlan - the completed / "destubbed" right-hand side of the bottommost InlineJoin in the plan. For example:
|
||||
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_Limit[1000[INTEGER],false]
|
||||
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
* @param originalSubPlan - the original (unchanged) right-hand side of the bottommost InlineJoin in the plan. For example:
|
||||
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_StubRelation[[x{r}#99]]]
|
||||
*/
|
||||
public record LogicalPlanTuple(LogicalPlan stubReplacedSubPlan, LogicalPlan originalSubPlan) {}
|
||||
|
||||
/**
|
||||
* Finds the "first" (closest to the source command or bottom up in the tree) {@link InlineJoin}, replaces the {@link StubRelation}
|
||||
* of the right-hand side with left-hand side's source and returns a tuple.
|
||||
*
|
||||
* Original optimized plan:
|
||||
* Limit[1000[INTEGER],true]
|
||||
* \_InlineJoin[LEFT,[],[],[]]
|
||||
* |_Limit[1000[INTEGER],false]
|
||||
* | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
* \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_StubRelation[[x{r}#99]]
|
||||
*
|
||||
* Takes the right hand side:
|
||||
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_StubRelation[[x{r}#99]]]
|
||||
*
|
||||
* And uses the left-hand side's source as its source:
|
||||
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_Limit[1000[INTEGER],false]
|
||||
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
*/
|
||||
public static LogicalPlanTuple firstSubPlan(LogicalPlan optimizedPlan) {
|
||||
Holder<LogicalPlanTuple> subPlan = new Holder<>();
|
||||
// Collect the first inlinejoin (bottom up in the tree)
|
||||
optimizedPlan.forEachUp(InlineJoin.class, ij -> {
|
||||
// extract the right side of the plan and replace its source
|
||||
if (subPlan.get() == null && ij.right().anyMatch(p -> p instanceof StubRelation)) {
|
||||
var p = replaceStub(ij.left(), ij.right());
|
||||
p.setOptimized();
|
||||
subPlan.set(new LogicalPlanTuple(p, ij.right()));
|
||||
}
|
||||
});
|
||||
return subPlan.get();
|
||||
}
|
||||
|
||||
public InlineJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {
|
||||
super(source, left, right, config);
|
||||
}
|
||||
|
@ -139,20 +196,12 @@ public class InlineJoin extends Join {
|
|||
JoinType joinType = config().type();
|
||||
List<Attribute> output;
|
||||
if (LEFT.equals(joinType)) {
|
||||
AttributeSet rightFields = AttributeSet.of(config().rightFields());
|
||||
List<Attribute> leftOutputWithoutMatchFields = new ArrayList<>();
|
||||
// at this point "left" part of the join contains all the attributes that represent the input of the join
|
||||
// including any aliasing (evals) of expressions used as grouping attributes (or join "match fields") in the join itself
|
||||
for (Attribute attr : left().output()) {
|
||||
if (rightFields.contains(attr) == false) {
|
||||
// the aforementioned groupings expressions or aliasing are removed from the left set of attributes
|
||||
leftOutputWithoutMatchFields.add(attr);
|
||||
}
|
||||
}
|
||||
// the actual output of the join will place the left hand side attributes (excluding any aliasing of the groupings)
|
||||
// as first columns in the output followed by whatever the right hand side of join adds in this order: aggregates first,
|
||||
// followed by groupings (this order should be preserved inside the rightFields() output)
|
||||
output = mergeOutputAttributes(right, leftOutputWithoutMatchFields);
|
||||
List<Attribute> leftOutputWithoutKeys = left.stream().filter(attr -> config().leftFields().contains(attr) == false).toList();
|
||||
List<Attribute> rightWithAppendedKeys = new ArrayList<>(right);
|
||||
rightWithAppendedKeys.removeAll(config().rightFields());
|
||||
rightWithAppendedKeys.addAll(config().leftFields());
|
||||
|
||||
output = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
|
||||
} else {
|
||||
throw new IllegalArgumentException(joinType.joinName() + " unsupported");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockUtils;
|
||||
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRowAsLocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
|
||||
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
|
||||
import org.elasticsearch.xpack.esql.session.EsqlSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* A {@link LocalSupplier} that allways creates a new copy of the {@link Block}s initially provided at creation time.
|
||||
* This is created specifically for {@link InlineStats} usage in {@link EsqlSession} for queries that use ROW command.
|
||||
*
|
||||
* The ROW which gets replaced by {@link ReplaceRowAsLocalRelation} with a {@link LocalRelation} will have its blocks
|
||||
* used (and released) at least twice:
|
||||
* - the {@link LocalRelation} from the left-hand side is used as a source for the right-hand side
|
||||
* - the same {@link LocalRelation} is then used to continue the execution of the query on the left-hand side
|
||||
*
|
||||
* It delegates all its operations to {@link ImmediateLocalSupplier} and, to prevent the double release, it will always
|
||||
* create a deep copy of the blocks received in the constructor initially.
|
||||
*
|
||||
* Example with the flow and the blocks reuse for a query like "row x = 1 | inlinestats y = max(x)"
|
||||
* Step 1:
|
||||
* Limit[1000[INTEGER],true]
|
||||
* \_InlineJoin[LEFT,[],[],[]]
|
||||
* |_Limit[1000[INTEGER],false]
|
||||
* | \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
* \_Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_StubRelation[[x{r}#99]]
|
||||
*
|
||||
* Step 2:
|
||||
* Aggregate[[],[MAX(x{r}#99,true[BOOLEAN]) AS y#102]]
|
||||
* \_Limit[1000[INTEGER],false]
|
||||
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
*
|
||||
* Step 3:
|
||||
* Limit[1000[INTEGER],true]
|
||||
* \_Eval[[1[INTEGER] AS y#102]]
|
||||
* \_Limit[1000[INTEGER],false]
|
||||
* \_LocalRelation[[x{r}#99],[IntVectorBlock[vector=ConstantIntVector[positions=1, value=1]]]]
|
||||
*/
|
||||
public class CopyingLocalSupplier implements LocalSupplier {
|
||||
|
||||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||
LocalSupplier.class,
|
||||
"CopyingSupplier",
|
||||
CopyingLocalSupplier::new
|
||||
);
|
||||
|
||||
private final ImmediateLocalSupplier delegate;
|
||||
|
||||
public CopyingLocalSupplier(Block[] blocks) {
|
||||
delegate = new ImmediateLocalSupplier(blocks);
|
||||
}
|
||||
|
||||
public CopyingLocalSupplier(StreamInput in) throws IOException {
|
||||
delegate = new ImmediateLocalSupplier(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Block[] get() {
|
||||
Block[] blockCopies = new Block[delegate.blocks.length];
|
||||
for (int i = 0; i < blockCopies.length; i++) {
|
||||
blockCopies[i] = BlockUtils.deepCopyOf(delegate.blocks[i], PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
|
||||
}
|
||||
return blockCopies;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
delegate.writeTo(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return ENTRY.name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null || obj.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
CopyingLocalSupplier other = (CopyingLocalSupplier) obj;
|
||||
return Arrays.equals(delegate.blocks, other.delegate.blocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return delegate.hashCode();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class EmptyLocalSupplier implements LocalSupplier {
|
||||
|
||||
public static final LocalSupplier EMPTY = new EmptyLocalSupplier();
|
||||
public static final String NAME = "EmptySupplier";
|
||||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LocalSupplier.class, NAME, in -> EMPTY);
|
||||
|
||||
private EmptyLocalSupplier() {}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Block[] get() {
|
||||
return BlockUtils.NO_BLOCKS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EMPTY";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return obj == EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -7,8 +7,11 @@
|
|||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -18,12 +21,23 @@ import java.util.Arrays;
|
|||
* A {@link LocalSupplier} that contains already filled {@link Block}s.
|
||||
*/
|
||||
public class ImmediateLocalSupplier implements LocalSupplier {
|
||||
private final Block[] blocks;
|
||||
|
||||
public ImmediateLocalSupplier(Block[] blocks) {
|
||||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||
LocalSupplier.class,
|
||||
"ImmediateSupplier",
|
||||
ImmediateLocalSupplier::new
|
||||
);
|
||||
|
||||
final Block[] blocks;
|
||||
|
||||
ImmediateLocalSupplier(Block[] blocks) {
|
||||
this.blocks = blocks;
|
||||
}
|
||||
|
||||
ImmediateLocalSupplier(StreamInput in) throws IOException {
|
||||
this(((PlanStreamInput) in).readCachedBlockArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Block[] get() {
|
||||
return blocks;
|
||||
|
@ -52,4 +66,9 @@ public class ImmediateLocalSupplier implements LocalSupplier {
|
|||
public int hashCode() {
|
||||
return Arrays.hashCode(blocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return ENTRY.name;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -15,6 +16,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
|
|||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -39,14 +41,26 @@ public class LocalRelation extends LeafPlan {
|
|||
public LocalRelation(StreamInput in) throws IOException {
|
||||
super(Source.readFrom((PlanStreamInput) in));
|
||||
this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
|
||||
this.supplier = LocalSupplier.readFrom((PlanStreamInput) in);
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
this.supplier = in.readNamedWriteable(LocalSupplier.class);
|
||||
} else {
|
||||
this.supplier = LocalSourceExec.readLegacyLocalSupplierFrom((PlanStreamInput) in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
source().writeTo(out);
|
||||
out.writeNamedWriteableCollection(output);
|
||||
supplier.writeTo(out);
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
out.writeNamedWriteable(supplier);
|
||||
} else {
|
||||
if (supplier == EmptyLocalSupplier.EMPTY) {
|
||||
out.writeVInt(0);
|
||||
} else {// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY
|
||||
((ImmediateLocalSupplier) supplier).writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -7,13 +7,10 @@
|
|||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockUtils;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
|
@ -25,41 +22,10 @@ import java.util.function.Supplier;
|
|||
* {@link UnsupportedOperationException}.
|
||||
* </p>
|
||||
*/
|
||||
public interface LocalSupplier extends Supplier<Block[]>, Writeable {
|
||||
|
||||
LocalSupplier EMPTY = new LocalSupplier() {
|
||||
@Override
|
||||
public Block[] get() {
|
||||
return BlockUtils.NO_BLOCKS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "EMPTY";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return obj == EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
public interface LocalSupplier extends Supplier<Block[]>, NamedWriteable {
|
||||
|
||||
static LocalSupplier of(Block[] blocks) {
|
||||
return new ImmediateLocalSupplier(blocks);
|
||||
}
|
||||
|
||||
static LocalSupplier readFrom(PlanStreamInput in) throws IOException {
|
||||
Block[] blocks = in.readCachedBlockArray();
|
||||
return blocks.length == 0 ? EMPTY : of(blocks);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
|
||||
|
||||
public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
|
||||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||
PhysicalPlan.class,
|
||||
|
@ -107,11 +109,12 @@ public class HashJoinExec extends BinaryExec implements EstimatesRowSize {
|
|||
@Override
|
||||
public List<Attribute> output() {
|
||||
if (lazyOutput == null) {
|
||||
lazyOutput = new ArrayList<>(left().output());
|
||||
var rightFieldNames = rightFields.stream().map(Attribute::name).toList();
|
||||
lazyOutput.removeIf(a -> rightFieldNames.contains(a.name()));
|
||||
lazyOutput.addAll(addedFields);
|
||||
lazyOutput.addAll(rightFields);
|
||||
List<Attribute> leftOutputWithoutKeys = left().output().stream().filter(attr -> leftFields.contains(attr) == false).toList();
|
||||
List<Attribute> rightWithAppendedKeys = new ArrayList<>(right().output());
|
||||
rightWithAppendedKeys.removeAll(rightFields);
|
||||
rightWithAppendedKeys.addAll(leftFields);
|
||||
|
||||
lazyOutput = mergeOutputAttributes(rightWithAppendedKeys, leftOutputWithoutKeys);
|
||||
}
|
||||
return lazyOutput;
|
||||
}
|
||||
|
|
|
@ -7,13 +7,17 @@
|
|||
|
||||
package org.elasticsearch.xpack.esql.plan.physical;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.xpack.esql.core.expression.Attribute;
|
||||
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
|
||||
import org.elasticsearch.xpack.esql.core.tree.Source;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.ImmediateLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -39,14 +43,41 @@ public class LocalSourceExec extends LeafExec {
|
|||
public LocalSourceExec(StreamInput in) throws IOException {
|
||||
super(Source.readFrom((PlanStreamInput) in));
|
||||
this.output = in.readNamedWriteableCollectionAsList(Attribute.class);
|
||||
this.supplier = LocalSupplier.readFrom((PlanStreamInput) in);
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
this.supplier = in.readNamedWriteable(LocalSupplier.class);
|
||||
} else {
|
||||
this.supplier = readLegacyLocalSupplierFrom((PlanStreamInput) in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Legacy {@link LocalSupplier} deserialization for code that didn't use {@link org.elasticsearch.common.io.stream.NamedWriteable}s
|
||||
* and the {@link LocalSupplier} had only one implementation (the {@link ImmediateLocalSupplier}).
|
||||
*
|
||||
* @param in
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static LocalSupplier readLegacyLocalSupplierFrom(PlanStreamInput in) throws IOException {
|
||||
Block[] blocks = in.readCachedBlockArray();
|
||||
return blocks.length == 0 ? EmptyLocalSupplier.EMPTY : LocalSupplier.of(blocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
source().writeTo(out);
|
||||
out.writeNamedWriteableCollection(output);
|
||||
supplier.writeTo(out);
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
out.writeNamedWriteable(supplier);
|
||||
} else {
|
||||
if (supplier == EmptyLocalSupplier.EMPTY) {
|
||||
out.writeVInt(0);
|
||||
} else {
|
||||
// here we can only have an ImmediateLocalSupplier as this was the only implementation apart from EMPTY
|
||||
// for earlier versions
|
||||
((ImmediateLocalSupplier) supplier).writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.xpack.esql.plan.logical.Sample;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.TopN;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
|
||||
|
@ -207,14 +206,10 @@ public class Mapper {
|
|||
throw new EsqlIllegalArgumentException("unsupported join type [" + config.type() + "]");
|
||||
}
|
||||
|
||||
if (join instanceof InlineJoin) {
|
||||
return new FragmentExec(bp);
|
||||
}
|
||||
|
||||
PhysicalPlan left = map(bp.left());
|
||||
|
||||
// only broadcast joins supported for now - hence push down as a streaming operator
|
||||
if (left instanceof FragmentExec fragment) {
|
||||
if (left instanceof FragmentExec) {
|
||||
return new FragmentExec(bp);
|
||||
}
|
||||
|
||||
|
@ -228,7 +223,7 @@ public class Mapper {
|
|||
config.matchFields(),
|
||||
config.leftFields(),
|
||||
config.rightFields(),
|
||||
join.output()
|
||||
join.rightOutputFields()
|
||||
);
|
||||
}
|
||||
if (right instanceof FragmentExec fragment
|
||||
|
|
|
@ -104,7 +104,6 @@ import org.elasticsearch.xpack.esql.telemetry.PlanTelemetry;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -114,6 +113,7 @@ import java.util.stream.Collectors;
|
|||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
|
||||
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
|
||||
import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan;
|
||||
|
||||
public class EsqlSession {
|
||||
|
||||
|
@ -217,8 +217,8 @@ public class EsqlSession {
|
|||
LogicalPlan optimizedPlan,
|
||||
ActionListener<Result> listener
|
||||
) {
|
||||
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
|
||||
if (explainMode) {
|
||||
if (explainMode) {// TODO: INLINESTATS come back to the explain mode branch and reevaluate
|
||||
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
|
||||
String physicalPlanString = physicalPlan.toString();
|
||||
List<Attribute> fields = List.of(
|
||||
new ReferenceAttribute(EMPTY, "role", DataType.KEYWORD),
|
||||
|
@ -231,44 +231,30 @@ public class EsqlSession {
|
|||
values.add(List.of("coordinator", "optimizedPhysicalPlan", physicalPlanString));
|
||||
var blocks = BlockUtils.fromList(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
|
||||
physicalPlan = new LocalSourceExec(Source.EMPTY, fields, LocalSupplier.of(blocks));
|
||||
planRunner.run(physicalPlan, listener);
|
||||
} else {
|
||||
// TODO: this could be snuck into the underlying listener
|
||||
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
|
||||
// execute any potential subplans
|
||||
executeSubPlans(optimizedPlan, planRunner, executionInfo, request, listener);
|
||||
}
|
||||
// TODO: this could be snuck into the underlying listener
|
||||
EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo);
|
||||
// execute any potential subplans
|
||||
executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener);
|
||||
}
|
||||
|
||||
private record PlanTuple(PhysicalPlan physical, LogicalPlan logical) {}
|
||||
|
||||
private void executeSubPlans(
|
||||
PhysicalPlan physicalPlan,
|
||||
LogicalPlan optimizedPlan,
|
||||
PlanRunner runner,
|
||||
EsqlExecutionInfo executionInfo,
|
||||
EsqlQueryRequest request,
|
||||
ActionListener<Result> listener
|
||||
) {
|
||||
List<PlanTuple> subplans = new ArrayList<>();
|
||||
|
||||
// Currently the inlinestats are limited and supported as streaming operators, thus present inside the fragment as logical plans
|
||||
// Below they get collected, translated into a separate, coordinator based plan and the results 'broadcasted' as a local relation
|
||||
physicalPlan.forEachUp(FragmentExec.class, f -> {
|
||||
f.fragment().forEachUp(InlineJoin.class, ij -> {
|
||||
// extract the right side of the plan and replace its source
|
||||
LogicalPlan subplan = InlineJoin.replaceStub(ij.left(), ij.right());
|
||||
// mark the new root node as optimized
|
||||
subplan.setOptimized();
|
||||
PhysicalPlan subqueryPlan = logicalPlanToPhysicalPlan(subplan, request);
|
||||
subplans.add(new PlanTuple(subqueryPlan, ij.right()));
|
||||
});
|
||||
});
|
||||
|
||||
Iterator<PlanTuple> iterator = subplans.iterator();
|
||||
var subPlan = firstSubPlan(optimizedPlan);
|
||||
|
||||
// TODO: merge into one method
|
||||
if (subplans.size() > 0) {
|
||||
if (subPlan != null) {
|
||||
// code-path to execute subplans
|
||||
executeSubPlan(new DriverCompletionInfo.Accumulator(), physicalPlan, iterator, executionInfo, runner, listener);
|
||||
executeSubPlan(new DriverCompletionInfo.Accumulator(), optimizedPlan, subPlan, executionInfo, runner, request, listener);
|
||||
} else {
|
||||
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
|
||||
// execute main plan
|
||||
runner.run(physicalPlan, listener);
|
||||
}
|
||||
|
@ -276,40 +262,47 @@ public class EsqlSession {
|
|||
|
||||
private void executeSubPlan(
|
||||
DriverCompletionInfo.Accumulator completionInfoAccumulator,
|
||||
PhysicalPlan plan,
|
||||
Iterator<PlanTuple> subPlanIterator,
|
||||
LogicalPlan optimizedPlan,
|
||||
InlineJoin.LogicalPlanTuple subPlans,
|
||||
EsqlExecutionInfo executionInfo,
|
||||
PlanRunner runner,
|
||||
EsqlQueryRequest request,
|
||||
ActionListener<Result> listener
|
||||
) {
|
||||
PlanTuple tuple = subPlanIterator.next();
|
||||
LOGGER.debug("Executing subplan:\n{}", subPlans.stubReplacedSubPlan());
|
||||
// Create a physical plan out of the logical sub-plan
|
||||
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request);
|
||||
|
||||
runner.run(tuple.physical, listener.delegateFailureAndWrap((next, result) -> {
|
||||
runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
|
||||
try {
|
||||
// Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
|
||||
completionInfoAccumulator.accumulate(result.completionInfo());
|
||||
LocalRelation resultWrapper = resultToPlan(tuple.logical, result);
|
||||
LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result);
|
||||
|
||||
// replace the original logical plan with the backing result
|
||||
PhysicalPlan newPlan = plan.transformUp(FragmentExec.class, f -> {
|
||||
LogicalPlan frag = f.fragment();
|
||||
return f.withFragment(
|
||||
frag.transformUp(
|
||||
InlineJoin.class,
|
||||
ij -> ij.right() == tuple.logical ? InlineJoin.inlineData(ij, resultWrapper) : ij
|
||||
)
|
||||
);
|
||||
});
|
||||
LogicalPlan newLogicalPlan = optimizedPlan.transformUp(
|
||||
InlineJoin.class,
|
||||
// use object equality since the right-hand side shouldn't have changed in the optimizedPlan at this point
|
||||
// and equals would have ignored name IDs anyway
|
||||
ij -> ij.right() == subPlans.originalSubPlan() ? InlineJoin.inlineData(ij, resultWrapper) : ij
|
||||
);
|
||||
// TODO: INLINESTATS can we do better here and further optimize the plan AFTER one of the subplans executed?
|
||||
newLogicalPlan.setOptimized();
|
||||
LOGGER.debug("Plan after previous subplan execution:\n{}", newLogicalPlan);
|
||||
// look for the next inlinejoin plan
|
||||
var newSubPlan = firstSubPlan(newLogicalPlan);
|
||||
|
||||
if (subPlanIterator.hasNext() == false) {
|
||||
runner.run(newPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
|
||||
if (newSubPlan == null) {// run the final "main" plan
|
||||
LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
|
||||
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
|
||||
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
|
||||
completionInfoAccumulator.accumulate(finalResult.completionInfo());
|
||||
finalListener.onResponse(
|
||||
new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo)
|
||||
);
|
||||
}));
|
||||
} else {
|
||||
// continue executing the subplans
|
||||
executeSubPlan(completionInfoAccumulator, newPlan, subPlanIterator, executionInfo, runner, next);
|
||||
} else {// continue executing the subplans
|
||||
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener);
|
||||
}
|
||||
} finally {
|
||||
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));
|
||||
|
@ -317,7 +310,7 @@ public class EsqlSession {
|
|||
}));
|
||||
}
|
||||
|
||||
private LocalRelation resultToPlan(LogicalPlan plan, Result result) {
|
||||
private static LocalRelation resultToPlan(LogicalPlan plan, Result result) {
|
||||
List<Page> pages = result.pages();
|
||||
List<Attribute> schema = result.schema();
|
||||
// if (pages.size() > 1) {
|
||||
|
|
|
@ -26,7 +26,7 @@ public class SessionUtils {
|
|||
// Limit ourselves to 1mb of results similar to LOOKUP for now.
|
||||
long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum();
|
||||
if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) {
|
||||
throw new IllegalArgumentException("first phase result too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
|
||||
throw new IllegalArgumentException("sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb");
|
||||
}
|
||||
int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum();
|
||||
Block.Builder[] builders = new Block.Builder[schema.size()];
|
||||
|
|
|
@ -4095,6 +4095,21 @@ public class AnalyzerTests extends ESTestCase {
|
|||
assertEquals("test*", esRelation.indexPattern());
|
||||
}
|
||||
|
||||
public void testGroupingOverridesInStats() {
|
||||
verifyUnsupported("""
|
||||
from test
|
||||
| stats MIN(salary) BY x = languages, x = x + 1
|
||||
""", "Found 1 problem\n" + "line 2:43: Unknown column [x]", "mapping-default.json");
|
||||
}
|
||||
|
||||
public void testGroupingOverridesInInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
verifyUnsupported("""
|
||||
from test
|
||||
| inlinestats MIN(salary) BY x = languages, x = x + 1
|
||||
""", "Found 1 problem\n" + "line 2:49: Unknown column [x]", "mapping-default.json");
|
||||
}
|
||||
|
||||
private void verifyNameAndType(String actualName, DataType actualType, String expectedName, DataType expectedType) {
|
||||
assertEquals(expectedName, actualName);
|
||||
assertEquals(expectedType, actualType);
|
||||
|
|
|
@ -52,9 +52,9 @@ import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.Project;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.Row;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.stats.SearchStats;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -681,7 +681,7 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
|
|||
var plan = localPlan("FROM test | WHERE TO_LOWER(TO_UPPER(first_name)) RLIKE \"VALÜ*\"");
|
||||
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
// same plan as in testReplaceUpperStringCasingWithInsensitiveRLike, but with LIKE instead of RLIKE
|
||||
|
@ -720,7 +720,7 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
|
|||
var plan = localPlan("FROM test | WHERE TO_LOWER(TO_UPPER(first_name)) LIKE \"VALÜ*\"");
|
||||
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -780,7 +780,7 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
|
|||
|
||||
private LocalRelation asEmptyRelation(Object o) {
|
||||
var empty = as(o, LocalRelation.class);
|
||||
assertThat(empty.supplier(), is(LocalSupplier.EMPTY));
|
||||
assertThat(empty.supplier(), is(EmptyLocalSupplier.EMPTY));
|
||||
return empty;
|
||||
}
|
||||
|
||||
|
|
|
@ -128,9 +128,9 @@ import org.elasticsearch.xpack.esql.plan.logical.join.Join;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
|
@ -1238,7 +1238,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
var rule = new PushDownAndCombineLimits();
|
||||
|
||||
var leftChild = emptySource();
|
||||
var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), LocalSupplier.EMPTY);
|
||||
var rightChild = new LocalRelation(Source.EMPTY, List.of(fieldAttribute()), EmptyLocalSupplier.EMPTY);
|
||||
assertNotEquals(leftChild, rightChild);
|
||||
|
||||
var joinConfig = new JoinConfig(JoinTypes.LEFT, List.of(), List.of(), List.of());
|
||||
|
@ -3089,7 +3089,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
""");
|
||||
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), is(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), is(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
public void testFoldFromRow() {
|
||||
|
@ -5340,7 +5340,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
""");
|
||||
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
assertWarnings(
|
||||
"Line 2:16: evaluation of [a + b] failed, treating result as null. Only first 20 failures recorded.",
|
||||
"Line 2:16: java.lang.IllegalArgumentException: single-value function encountered multi-value"
|
||||
|
@ -6107,7 +6107,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
public void testReplaceStringCasingWithInsensitiveEqualsUpperFalse() {
|
||||
var plan = optimizedPlan("FROM test | WHERE TO_UPPER(first_name) == \"VALÜe\"");
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
public void testReplaceStringCasingWithInsensitiveEqualsUpperTrue() {
|
||||
|
@ -6122,7 +6122,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
public void testReplaceStringCasingWithInsensitiveEqualsLowerFalse() {
|
||||
var plan = optimizedPlan("FROM test | WHERE TO_LOWER(first_name) == \"VALÜe\"");
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
public void testReplaceStringCasingWithInsensitiveEqualsLowerTrue() {
|
||||
|
@ -7318,7 +7318,7 @@ public class LogicalPlanOptimizerTests extends AbstractLogicalPlanOptimizerTests
|
|||
| LIMIT 12
|
||||
""");
|
||||
var local = as(plan, LocalRelation.class);
|
||||
assertThat(local.supplier(), equalTo(LocalSupplier.EMPTY));
|
||||
assertThat(local.supplier(), equalTo(EmptyLocalSupplier.EMPTY));
|
||||
}
|
||||
|
||||
public void testFunctionNamedParamsAsFunctionArgument() {
|
||||
|
|
|
@ -109,8 +109,8 @@ import org.elasticsearch.xpack.esql.plan.logical.Project;
|
|||
import org.elasticsearch.xpack.esql.plan.logical.TopN;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
|
||||
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
|
||||
|
@ -2547,7 +2547,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
|
|||
var limit = as(optimized, LimitExec.class);
|
||||
var exchange = as(limit.child(), ExchangeExec.class);
|
||||
var source = as(exchange.child(), LocalSourceExec.class);
|
||||
assertEquals(LocalSupplier.EMPTY, source.supplier());
|
||||
assertEquals(EmptyLocalSupplier.EMPTY, source.supplier());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -83,7 +83,7 @@ public class PropagateInlineEvalsTests extends ESTestCase {
|
|||
* \_StubRelation[[emp_no{f}#11, languages{f}#14, gender{f}#13, y{r}#10]]
|
||||
*/
|
||||
public void testGroupingAliasingMoved_To_LeftSideOfJoin() {
|
||||
assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
var plan = plan("""
|
||||
from test
|
||||
| keep emp_no, languages, gender
|
||||
|
@ -126,7 +126,7 @@ public class PropagateInlineEvalsTests extends ESTestCase {
|
|||
* {r}#21]]
|
||||
*/
|
||||
public void testGroupingAliasingMoved_To_LeftSideOfJoin_WithExpression() {
|
||||
assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("Requires INLINESTATS", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
var plan = plan("""
|
||||
from test
|
||||
| keep emp_no, languages, gender, last_name, first_name
|
||||
|
|
|
@ -30,6 +30,7 @@ public abstract class AbstractLogicalPlanSerializationTests<T extends LogicalPla
|
|||
protected final NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
|
||||
entries.addAll(PlanWritables.logical());
|
||||
entries.addAll(PlanWritables.others());
|
||||
entries.addAll(ExpressionWritables.aggregates());
|
||||
entries.addAll(ExpressionWritables.allExpressions());
|
||||
return new NamedWriteableRegistry(entries);
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class CopyingLocalSupplierTests extends LocalSupplierTests {
|
||||
|
||||
@Override
|
||||
protected LocalSupplier createTestInstance() {
|
||||
Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new);
|
||||
return new CopyingLocalSupplier(blocks);
|
||||
}
|
||||
|
||||
protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) {
|
||||
assertNotSame(version.toString(), bwcDeserializedObject, testInstance);
|
||||
if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
assertThat(testInstance, equalTo(bwcDeserializedObject));
|
||||
} else {
|
||||
assertTrue(version.toString(), bwcDeserializedObject instanceof ImmediateLocalSupplier);
|
||||
}
|
||||
assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class EmptyLocalSupplierTests extends LocalSupplierTests {
|
||||
|
||||
@Override
|
||||
protected LocalSupplier createTestInstance() {
|
||||
return EmptyLocalSupplier.EMPTY;
|
||||
}
|
||||
|
||||
protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) {
|
||||
assertSame(version.toString(), bwcDeserializedObject, testInstance);
|
||||
assertThat(version.toString(), bwcDeserializedObject, equalTo(EmptyLocalSupplier.EMPTY));
|
||||
assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTo(BytesStreamOutput output, LocalSupplier instance, TransportVersion version) throws IOException {
|
||||
if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
new PlanStreamOutput(output, null).writeNamedWriteable(instance);
|
||||
} else {
|
||||
output.writeVInt(0);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ImmediateLocalSupplierTests extends LocalSupplierTests {
|
||||
|
||||
@Override
|
||||
protected LocalSupplier createTestInstance() {
|
||||
Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new);
|
||||
return new ImmediateLocalSupplier(blocks);
|
||||
}
|
||||
|
||||
protected void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version) {
|
||||
assertNotSame(version.toString(), bwcDeserializedObject, testInstance);
|
||||
assertThat(version.toString(), testInstance, equalTo(bwcDeserializedObject));
|
||||
assertEquals(version.toString(), testInstance.hashCode(), bwcDeserializedObject.hashCode());
|
||||
}
|
||||
}
|
|
@ -8,45 +8,95 @@
|
|||
package org.elasticsearch.xpack.esql.plan.logical.local;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntBlock;
|
||||
import org.elasticsearch.test.AbstractWireTestCase;
|
||||
import org.elasticsearch.test.TransportVersionUtils;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
|
||||
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
|
||||
import org.elasticsearch.xpack.esql.plan.PlanWritables;
|
||||
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
public abstract class LocalSupplierTests extends AbstractWireTestCase<LocalSupplier> {
|
||||
|
||||
private static final NavigableSet<TransportVersion> DEFAULT_BWC_VERSIONS = getAllBWCVersions();
|
||||
|
||||
public class LocalSupplierTests extends AbstractWireTestCase<LocalSupplier> {
|
||||
private static final BlockFactory BLOCK_FACTORY = BlockFactory.getInstance(
|
||||
new NoopCircuitBreaker("noop-esql-breaker"),
|
||||
BigArrays.NON_RECYCLING_INSTANCE
|
||||
);
|
||||
|
||||
private static NavigableSet<TransportVersion> getAllBWCVersions() {
|
||||
return TransportVersionUtils.allReleasedVersions().tailSet(TransportVersions.MINIMUM_COMPATIBLE, true);
|
||||
}
|
||||
|
||||
public final void testBwcSerialization() throws IOException {
|
||||
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
|
||||
LocalSupplier testInstance = createTestInstance();
|
||||
for (TransportVersion bwcVersion : DEFAULT_BWC_VERSIONS) {
|
||||
assertBwcSerialization(testInstance, bwcVersion);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected final void assertBwcSerialization(LocalSupplier testInstance, TransportVersion version) throws IOException {
|
||||
LocalSupplier deserializedInstance = copyInstance(testInstance, version);
|
||||
assertOnBWCObject(testInstance, deserializedInstance, version);
|
||||
}
|
||||
|
||||
protected abstract void assertOnBWCObject(LocalSupplier testInstance, LocalSupplier bwcDeserializedObject, TransportVersion version);
|
||||
|
||||
@Override
|
||||
protected LocalSupplier copyInstance(LocalSupplier instance, TransportVersion version) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
output.setTransportVersion(version);
|
||||
instance.writeTo(new PlanStreamOutput(output, null));
|
||||
writeTo(output, instance, version);
|
||||
try (StreamInput in = output.bytes().streamInput()) {
|
||||
in.setTransportVersion(version);
|
||||
return LocalSupplier.readFrom(new PlanStreamInput(in, getNamedWriteableRegistry(), null));
|
||||
return readFrom(in, version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeTo(BytesStreamOutput output, LocalSupplier instance, TransportVersion version) throws IOException {
|
||||
if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
new PlanStreamOutput(output, null).writeNamedWriteable(instance);
|
||||
} else {
|
||||
instance.writeTo(new PlanStreamOutput(output, null));
|
||||
}
|
||||
}
|
||||
|
||||
protected LocalSupplier readFrom(StreamInput input, TransportVersion version) throws IOException {
|
||||
if (version.onOrAfter(TransportVersions.ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS)) {
|
||||
return new PlanStreamInput(input, getNamedWriteableRegistry(), null).readNamedWriteable(LocalSupplier.class);
|
||||
} else {
|
||||
return LocalSourceExec.readLegacyLocalSupplierFrom(new PlanStreamInput(input, getNamedWriteableRegistry(), null));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LocalSupplier createTestInstance() {
|
||||
return randomBoolean() ? LocalSupplier.EMPTY : randomNonEmpty();
|
||||
return randomLocalSupplier();
|
||||
}
|
||||
|
||||
public static LocalSupplier randomLocalSupplier() {
|
||||
return randomBoolean() ? EmptyLocalSupplier.EMPTY : randomNonEmpty();
|
||||
}
|
||||
|
||||
public static LocalSupplier randomNonEmpty() {
|
||||
return LocalSupplier.of(randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new));
|
||||
Block[] blocks = randomList(1, 10, LocalSupplierTests::randomBlock).toArray(Block[]::new);
|
||||
return randomBoolean() ? LocalSupplier.of(blocks) : new CopyingLocalSupplier(blocks);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,7 +104,7 @@ public class LocalSupplierTests extends AbstractWireTestCase<LocalSupplier> {
|
|||
Block[] blocks = instance.get();
|
||||
if (blocks.length > 0 && randomBoolean()) {
|
||||
if (randomBoolean()) {
|
||||
return LocalSupplier.EMPTY;
|
||||
return EmptyLocalSupplier.EMPTY;
|
||||
}
|
||||
return LocalSupplier.of(Arrays.copyOf(blocks, blocks.length - 1, Block[].class));
|
||||
}
|
||||
|
@ -63,7 +113,12 @@ public class LocalSupplierTests extends AbstractWireTestCase<LocalSupplier> {
|
|||
return LocalSupplier.of(blocks);
|
||||
}
|
||||
|
||||
private static Block randomBlock() {
|
||||
@Override
|
||||
protected NamedWriteableRegistry getNamedWriteableRegistry() {
|
||||
return new NamedWriteableRegistry(PlanWritables.others());
|
||||
}
|
||||
|
||||
static Block randomBlock() {
|
||||
int len = between(1, 1000);
|
||||
try (IntBlock.Builder ints = BLOCK_FACTORY.newIntBlockBuilder(len)) {
|
||||
for (int i = 0; i < len; i++) {
|
||||
|
|
|
@ -19,7 +19,7 @@ public class LocalSourceExecSerializationTests extends AbstractPhysicalPlanSeria
|
|||
public static LocalSourceExec randomLocalSourceExec() {
|
||||
Source source = randomSource();
|
||||
List<Attribute> output = randomFieldAttributes(1, 9, false);
|
||||
LocalSupplier supplier = randomBoolean() ? LocalSupplier.EMPTY : LocalSupplierTests.randomNonEmpty();
|
||||
LocalSupplier supplier = LocalSupplierTests.randomLocalSupplier();
|
||||
return new LocalSourceExec(source, output, supplier);
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ public class LocalSourceExecSerializationTests extends AbstractPhysicalPlanSeria
|
|||
if (randomBoolean()) {
|
||||
output = randomValueOtherThan(output, () -> randomFieldAttributes(1, 9, false));
|
||||
} else {
|
||||
supplier = randomValueOtherThan(supplier, () -> randomBoolean() ? LocalSupplier.EMPTY : LocalSupplierTests.randomNonEmpty());
|
||||
supplier = randomValueOtherThan(supplier, () -> LocalSupplierTests.randomLocalSupplier());
|
||||
}
|
||||
return new LocalSourceExec(instance.source(), output, supplier);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testBasicFromCommandWithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("from test | inlinestats max(salary) by gender", ALL_FIELDS);
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testBasicFromCommandWithMetadata_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("from test metadata _index, _id, _version | inlinestats max(salary)", ALL_FIELDS);
|
||||
}
|
||||
|
||||
|
@ -305,7 +305,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testLimitZero_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
FROM employees
|
||||
| INLINESTATS COUNT(*), MAX(salary) BY gender
|
||||
|
@ -320,7 +320,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testDocsDropHeight_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
FROM employees
|
||||
| DROP height
|
||||
|
@ -336,7 +336,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testDocsDropHeightWithWildcard_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
FROM employees
|
||||
| INLINESTATS MAX(salary) BY gender
|
||||
|
@ -503,7 +503,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testSortWithLimitOne_DropHeight_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("from employees | inlinestats avg(salary) by languages | sort languages | limit 1 | drop height*", ALL_FIELDS);
|
||||
}
|
||||
|
||||
|
@ -803,7 +803,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFilterById_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("FROM apps metadata _id | INLINESTATS max(rate) | WHERE _id == \"4\"", ALL_FIELDS);
|
||||
}
|
||||
|
||||
|
@ -1274,7 +1274,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testProjectDropPattern_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| inlinestats max(foo) by bar
|
||||
|
@ -1357,7 +1357,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testCountAllAndOtherStatGrouped_WithInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| inlinestats c = count(*), min = min(emp_no) by languages
|
||||
|
@ -1396,7 +1396,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testCountAllWithEval_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| rename languages as l
|
||||
|
@ -1409,7 +1409,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testKeepAfterEval_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| rename languages as l
|
||||
|
@ -1422,7 +1422,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testKeepBeforeEval_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| rename languages as l
|
||||
|
@ -1435,7 +1435,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testStatsBeforeEval_AndInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| rename languages as l
|
||||
|
@ -1447,7 +1447,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testStatsBeforeInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| stats min = min(salary) by languages
|
||||
|
@ -1456,7 +1456,7 @@ public class IndexResolverFieldNamesTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testKeepBeforeInlinestats() {
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V7.isEnabled());
|
||||
assumeTrue("INLINESTATS required", EsqlCapabilities.Cap.INLINESTATS_V8.isEnabled());
|
||||
assertFieldNames("""
|
||||
from test
|
||||
| keep languages, salary
|
||||
|
|
Loading…
Reference in New Issue