Speed up COALESCE significantly (#120139)

```
                      before              after
     (operation)   Score   Error       Score   Error  Units
 coalesce_2_noop  75.949 ± 3.961  ->   0.010 ±  0.001 ns/op  99.9%
coalesce_2_eager  99.299 ± 6.959  ->   4.292 ±  0.227 ns/op  95.7%
 coalesce_2_lazy 113.118 ± 5.747  ->  26.746 ±  0.954 ns/op  76.4%
```

We tend to advise folks that "COALESCE is faster than CASE", but, as of
8.16.0/https://github.com/elastic/elasticsearch/pull/112295 that wasn't the true. I was working with someone a few
days ago to port a scripted_metric aggregation to ESQL and we saw
COALESCE taking ~60% of the time. That won't do.

The trouble is that CASE and COALESCE have to be *lazy*, meaning that
operations like:
```
COALESCE(a, 1 / b)
```
should never emit a warning if `a` is not `null`, even if `b` is `0`. In
8.16/https://github.com/elastic/elasticsearch/pull/112295 CASE grew an optimization where it could operate non-lazily
if it was flagged as "safe". This brings a similar optimization to
COALESCE, see it above as "case_2_eager", a 95.7% improvement.

It also brings and arguably more important optimization - entire-block
execution for COALESCE. The schort version is that, if the first
parameter of COALESCE returns no nulls we can return it without doing
anything lazily. There are a few more cases, but the upshot is that
COALESCE is pretty much *free* in cases where long strings of results
are `null` or not `null`. That's the `coalesce_2_noop` line.

Finally, when there mixed null and non-null values we were using a
single builder with some fairly inefficient paths. This specializes them
per type and skips some slow null-checking where possible. That's the
`coalesce_2_lazy` result, a more modest 76.4%.

NOTE: These %s of improvements on COALESCE itself, or COALESCE with some load-overhead operators like `+`. If COALESCE isn't taking a *ton* time in your query don't get particularly excited about this. It's fun though.

Closes #119953
This commit is contained in:
Nik Everett 2025-01-23 12:40:09 -05:00 committed by GitHub
parent f27f74666f
commit dc4fa26174
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1943 additions and 195 deletions

1
.gitattributes vendored
View File

@ -11,6 +11,7 @@ x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/*.interp li
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseLexer*.java linguist-generated=true
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseParser*.java linguist-generated=true
x-pack/plugin/esql/src/main/generated/** linguist-generated=true
x-pack/plugin/esql/src/main/generated-src/** linguist-generated=true
# ESQL functions docs are autogenerated. More information at `docs/reference/esql/functions/README.md`
docs/reference/esql/functions/*/** linguist-generated=true

View File

@ -126,9 +126,12 @@ exit
Grab the async profiler from https://github.com/jvm-profiling-tools/async-profiler
and run `prof async` like so:
```
gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/tmp/async-profiler-1.8.3-linux-x64/build/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"'
gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-3.0-29ee888-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"'
```
Note: As of January 2025 the latest release of async profiler doesn't work
with our JDK but the nightly is fine.
If you are on Mac, this'll warn you that you downloaded the shared library from
the internet. You'll need to go to settings and allow it to run.

View File

@ -38,6 +38,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.RLike;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
@ -96,6 +97,9 @@ public class EvalBenchmark {
"add_double",
"case_1_eager",
"case_1_lazy",
"coalesce_2_noop",
"coalesce_2_eager",
"coalesce_2_lazy",
"date_trunc",
"equal_to_const",
"long_equal_to_long",
@ -142,8 +146,34 @@ public class EvalBenchmark {
lhs = new Add(Source.EMPTY, lhs, new Literal(Source.EMPTY, 1L, DataType.LONG));
rhs = new Add(Source.EMPTY, rhs, new Literal(Source.EMPTY, 1L, DataType.LONG));
}
yield EvalMapper.toEvaluator(FOLD_CONTEXT, new Case(Source.EMPTY, condition, List.of(lhs, rhs)), layout(f1, f2))
.get(driverContext);
EvalOperator.ExpressionEvaluator evaluator = EvalMapper.toEvaluator(
FOLD_CONTEXT,
new Case(Source.EMPTY, condition, List.of(lhs, rhs)),
layout(f1, f2)
).get(driverContext);
String desc = operation.endsWith("lazy") ? "CaseLazyEvaluator" : "CaseEagerEvaluator";
if (evaluator.toString().contains(desc) == false) {
throw new IllegalArgumentException("Evaluator was [" + evaluator + "] but expected one containing [" + desc + "]");
}
yield evaluator;
}
case "coalesce_2_noop", "coalesce_2_eager", "coalesce_2_lazy" -> {
FieldAttribute f1 = longField();
FieldAttribute f2 = longField();
Expression lhs = f1;
if (operation.endsWith("lazy")) {
lhs = new Add(Source.EMPTY, lhs, new Literal(Source.EMPTY, 1L, DataType.LONG));
}
EvalOperator.ExpressionEvaluator evaluator = EvalMapper.toEvaluator(
FOLD_CONTEXT,
new Coalesce(Source.EMPTY, lhs, List.of(f2)),
layout(f1, f2)
).get(driverContext);
String desc = operation.endsWith("lazy") ? "CoalesceLazyEvaluator" : "CoalesceEagerEvaluator";
if (evaluator.toString().contains(desc) == false) {
throw new IllegalArgumentException("Evaluator was [" + evaluator + "] but expected one containing [" + desc + "]");
}
yield evaluator;
}
case "date_trunc" -> {
FieldAttribute timestamp = new FieldAttribute(
@ -260,6 +290,38 @@ public class EvalBenchmark {
}
}
}
case "coalesce_2_noop" -> {
LongVector f1 = actual.<LongBlock>getBlock(0).asVector();
LongVector result = actual.<LongBlock>getBlock(2).asVector();
for (int i = 0; i < BLOCK_LENGTH; i++) {
long expected = f1.getLong(i);
if (result.getLong(i) != expected) {
throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]");
}
}
}
case "coalesce_2_eager" -> {
LongBlock f1 = actual.<LongBlock>getBlock(0);
LongVector f2 = actual.<LongBlock>getBlock(1).asVector();
LongVector result = actual.<LongBlock>getBlock(2).asVector();
for (int i = 0; i < BLOCK_LENGTH; i++) {
long expected = i % 5 == 0 ? f2.getLong(i) : f1.getLong(f1.getFirstValueIndex(i));
if (result.getLong(i) != expected) {
throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]");
}
}
}
case "coalesce_2_lazy" -> {
LongBlock f1 = actual.<LongBlock>getBlock(0);
LongVector f2 = actual.<LongBlock>getBlock(1).asVector();
LongVector result = actual.<LongBlock>getBlock(2).asVector();
for (int i = 0; i < BLOCK_LENGTH; i++) {
long expected = i % 5 == 0 ? f2.getLong(i) : f1.getLong(f1.getFirstValueIndex(i)) + 1;
if (result.getLong(i) != expected) {
throw new AssertionError("[" + operation + "] expected [" + expected + "] but was [" + result.getLong(i) + "]");
}
}
}
case "date_trunc" -> {
LongVector v = actual.<LongBlock>getBlock(1).asVector();
long oneDay = TimeValue.timeValueHours(24).millis();
@ -304,7 +366,7 @@ public class EvalBenchmark {
}
}
}
default -> throw new UnsupportedOperationException();
default -> throw new UnsupportedOperationException(operation);
}
}
@ -324,7 +386,7 @@ public class EvalBenchmark {
}
yield new Page(builder.build());
}
case "case_1_eager", "case_1_lazy" -> {
case "case_1_eager", "case_1_lazy", "coalesce_2_noop" -> {
var f1 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
var f2 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
for (int i = 0; i < BLOCK_LENGTH; i++) {
@ -333,6 +395,19 @@ public class EvalBenchmark {
}
yield new Page(f1.build(), f2.build());
}
case "coalesce_2_eager", "coalesce_2_lazy" -> {
var f1 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
var f2 = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
for (int i = 0; i < BLOCK_LENGTH; i++) {
if (i % 5 == 0) {
f1.appendNull();
} else {
f1.appendLong(i);
}
f2.appendLong(-i);
}
yield new Page(f1.build(), f2.build());
}
case "long_equal_to_long" -> {
var lhs = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
var rhs = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);

View File

@ -348,4 +348,31 @@ tasks.named('stringTemplates').configure {
it.inputFile = inInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/InBytesRefEvaluator.java"
}
File coalesceInputFile = file("src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/X-CoalesceEvaluator.java.st")
template {
it.properties = booleanProperties
it.inputFile = coalesceInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBooleanEvaluator.java"
}
template {
it.properties = intProperties
it.inputFile = coalesceInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceIntEvaluator.java"
}
template {
it.properties = longProperties
it.inputFile = coalesceInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceLongEvaluator.java"
}
template {
it.properties = doubleProperties
it.inputFile = coalesceInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceDoubleEvaluator.java"
}
template {
it.properties = bytesRefProperties
it.inputFile = coalesceInputFile
it.outputFile = "org/elasticsearch/xpack/esql/expression/function/scalar/nulls/CoalesceBytesRefEvaluator.java"
}
}

View File

@ -223,6 +223,14 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
*/
Builder copyFrom(BooleanBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
*/
Builder copyFrom(BooleanBlock block, int position);
@Override
Builder appendNull();

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.BitArray;
@ -85,7 +86,11 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(BooleanBlock, int)}.
* </p>
*/
@Override
public BooleanBlockBuilder copyFrom(BooleanBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -101,21 +106,7 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
private void copyFromBlock(BooleanBlock block, int beginInclusive, int endExclusive) {
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendBoolean(block.getBoolean(i++));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p);
}
}
@ -125,6 +116,37 @@ final class BooleanBlockBuilder extends AbstractBlockBuilder implements BooleanB
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public BooleanBlockBuilder copyFrom(BooleanBlock block, int position) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendBoolean(block.getBoolean(i++));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendBoolean(block.getBoolean(i++));
}
endPositionEntry();
return this;
}
@Override
public BooleanBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -228,6 +228,16 @@ public sealed interface BytesRefBlock extends Block permits BytesRefArrayBlock,
*/
Builder copyFrom(BytesRefBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* @param scratch Scratch string used to prevent allocation. Share this
between many calls to this function.
*/
Builder copyFrom(BytesRefBlock block, int position, BytesRef scratch);
@Override
Builder appendNull();

View File

@ -88,7 +88,11 @@ final class BytesRefBlockBuilder extends AbstractBlockBuilder implements BytesRe
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(BytesRefBlock, int, BytesRef scratch)}.
* </p>
*/
@Override
public BytesRefBlockBuilder copyFrom(BytesRefBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -105,21 +109,7 @@ final class BytesRefBlockBuilder extends AbstractBlockBuilder implements BytesRe
private void copyFromBlock(BytesRefBlock block, int beginInclusive, int endExclusive) {
BytesRef scratch = new BytesRef();
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendBytesRef(block.getBytesRef(i++, scratch));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p, scratch);
}
}
@ -130,6 +120,39 @@ final class BytesRefBlockBuilder extends AbstractBlockBuilder implements BytesRe
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* @param scratch Scratch string used to prevent allocation. Share this
between many calls to this function.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public BytesRefBlockBuilder copyFrom(BytesRefBlock block, int position, BytesRef scratch) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendBytesRef(block.getBytesRef(i++, scratch));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendBytesRef(block.getBytesRef(i++, scratch));
}
endPositionEntry();
return this;
}
@Override
public BytesRefBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -217,6 +217,14 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub
*/
Builder copyFrom(DoubleBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
*/
Builder copyFrom(DoubleBlock block, int position);
@Override
Builder appendNull();

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.DoubleArray;
@ -85,7 +86,11 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(DoubleBlock, int)}.
* </p>
*/
@Override
public DoubleBlockBuilder copyFrom(DoubleBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -101,21 +106,7 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
private void copyFromBlock(DoubleBlock block, int beginInclusive, int endExclusive) {
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendDouble(block.getDouble(i++));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p);
}
}
@ -125,6 +116,37 @@ final class DoubleBlockBuilder extends AbstractBlockBuilder implements DoubleBlo
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public DoubleBlockBuilder copyFrom(DoubleBlock block, int position) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendDouble(block.getDouble(i++));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendDouble(block.getDouble(i++));
}
endPositionEntry();
return this;
}
@Override
public DoubleBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -216,6 +216,14 @@ public sealed interface FloatBlock extends Block permits FloatArrayBlock, FloatV
*/
Builder copyFrom(FloatBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
*/
Builder copyFrom(FloatBlock block, int position);
@Override
Builder appendNull();

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.FloatArray;
@ -85,7 +86,11 @@ final class FloatBlockBuilder extends AbstractBlockBuilder implements FloatBlock
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(FloatBlock, int)}.
* </p>
*/
@Override
public FloatBlockBuilder copyFrom(FloatBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -101,21 +106,7 @@ final class FloatBlockBuilder extends AbstractBlockBuilder implements FloatBlock
private void copyFromBlock(FloatBlock block, int beginInclusive, int endExclusive) {
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendFloat(block.getFloat(i++));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p);
}
}
@ -125,6 +116,37 @@ final class FloatBlockBuilder extends AbstractBlockBuilder implements FloatBlock
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public FloatBlockBuilder copyFrom(FloatBlock block, int position) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendFloat(block.getFloat(i++));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendFloat(block.getFloat(i++));
}
endPositionEntry();
return this;
}
@Override
public FloatBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -216,6 +216,14 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB
*/
Builder copyFrom(IntBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
*/
Builder copyFrom(IntBlock block, int position);
@Override
Builder appendNull();

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.IntArray;
@ -85,7 +86,11 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(IntBlock, int)}.
* </p>
*/
@Override
public IntBlockBuilder copyFrom(IntBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -101,21 +106,7 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui
private void copyFromBlock(IntBlock block, int beginInclusive, int endExclusive) {
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendInt(block.getInt(i++));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p);
}
}
@ -125,6 +116,37 @@ final class IntBlockBuilder extends AbstractBlockBuilder implements IntBlock.Bui
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public IntBlockBuilder copyFrom(IntBlock block, int position) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendInt(block.getInt(i++));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendInt(block.getInt(i++));
}
endPositionEntry();
return this;
}
@Override
public IntBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -217,6 +217,14 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect
*/
Builder copyFrom(LongBlock block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
*/
Builder copyFrom(LongBlock block, int position);
@Override
Builder appendNull();

View File

@ -7,6 +7,7 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.LongArray;
@ -85,7 +86,11 @@ final class LongBlockBuilder extends AbstractBlockBuilder implements LongBlock.B
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom(LongBlock, int)}.
* </p>
*/
@Override
public LongBlockBuilder copyFrom(LongBlock block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -101,21 +106,7 @@ final class LongBlockBuilder extends AbstractBlockBuilder implements LongBlock.B
private void copyFromBlock(LongBlock block, int beginInclusive, int endExclusive) {
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
appendLong(block.getLong(i++));
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p);
}
}
@ -125,6 +116,37 @@ final class LongBlockBuilder extends AbstractBlockBuilder implements LongBlock.B
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public LongBlockBuilder copyFrom(LongBlock block, int position) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
appendLong(block.getLong(i++));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
appendLong(block.getLong(i++));
}
endPositionEntry();
return this;
}
@Override
public LongBlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -280,6 +280,11 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single position copies use the faster
* {@link IntBlockBuilder#copyFrom(IntBlock, int)},
* {@link LongBlockBuilder#copyFrom(LongBlock, int)}, etc.
* </p>
*/
Builder copyFrom(Block block, int beginInclusive, int endExclusive);

View File

@ -288,6 +288,18 @@ $endif$
*/
Builder copyFrom($Type$Block block, int beginInclusive, int endExclusive);
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
$if(BytesRef)$
* @param scratch Scratch string used to prevent allocation. Share this
between many calls to this function.
$endif$
*/
Builder copyFrom($Type$Block block, int position$if(BytesRef)$, BytesRef scratch$endif$);
@Override
Builder appendNull();

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.Releasables;
$else$
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.$Array$;
@ -123,7 +124,11 @@ $endif$
/**
* Copy the values in {@code block} from {@code beginInclusive} to
* {@code endExclusive} into this builder.
* <p>
* For single-position copies see {@link #copyFrom($Type$Block, int$if(BytesRef)$, BytesRef scratch$endif$)}.
* </p>
*/
@Override
public $Type$BlockBuilder copyFrom($Type$Block block, int beginInclusive, int endExclusive) {
if (endExclusive > block.getPositionCount()) {
throw new IllegalArgumentException("can't copy past the end [" + endExclusive + " > " + block.getPositionCount() + "]");
@ -142,25 +147,7 @@ $if(BytesRef)$
BytesRef scratch = new BytesRef();
$endif$
for (int p = beginInclusive; p < endExclusive; p++) {
if (block.isNull(p)) {
appendNull();
continue;
}
int count = block.getValueCount(p);
if (count > 1) {
beginPositionEntry();
}
int i = block.getFirstValueIndex(p);
for (int v = 0; v < count; v++) {
$if(BytesRef)$
appendBytesRef(block.getBytesRef(i++, scratch));
$else$
append$Type$(block.get$Type$(i++));
$endif$
}
if (count > 1) {
endPositionEntry();
}
copyFrom(block, p$if(BytesRef)$, scratch$endif$);
}
}
@ -177,6 +164,41 @@ $endif$
}
}
/**
* Copy the values in {@code block} at {@code position}. If this position
* has a single value, this'll copy a single value. If this positions has
* many values, it'll copy all of them. If this is {@code null}, then it'll
* copy the {@code null}.
$if(BytesRef)$
* @param scratch Scratch string used to prevent allocation. Share this
between many calls to this function.
$endif$
* <p>
* Note that there isn't a version of this method on {@link Block.Builder} that takes
* {@link Block}. That'd be quite slow, running position by position. And it's important
* to know if you are copying {@link BytesRef}s so you can have the scratch.
* </p>
*/
@Override
public $Type$BlockBuilder copyFrom($Type$Block block, int position$if(BytesRef)$, BytesRef scratch$endif$) {
if (block.isNull(position)) {
appendNull();
return this;
}
int count = block.getValueCount(position);
int i = block.getFirstValueIndex(position);
if (count == 1) {
append$Type$(block.get$Type$(i++$if(BytesRef)$, scratch$endif$));
return this;
}
beginPositionEntry();
for (int v = 0; v < count; v++) {
append$Type$(block.get$Type$(i++$if(BytesRef)$, scratch$endif$));
}
endPositionEntry();
return this;
}
@Override
public $Type$BlockBuilder mvOrdering(Block.MvOrdering mvOrdering) {
this.mvOrdering = mvOrdering;

View File

@ -96,12 +96,18 @@ public class EvalOperator extends AbstractPageMappingOperator {
public void close() {
}
@Override
public String toString() {
return CONSTANT_NULL_NAME;
}
};
}
@Override
public String toString() {
return "ConstantNull";
return CONSTANT_NULL_NAME;
}
};
private static final String CONSTANT_NULL_NAME = "ConstantNull";
}

View File

@ -10,6 +10,7 @@ package org.elasticsearch.compute.data;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.test.RandomBlock;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.test.ESTestCase;
@ -92,7 +93,16 @@ public class BlockBuilderCopyFromTests extends ESTestCase {
Block.Builder builder = elementType.newBlockBuilder(block.getPositionCount() / 2, blockFactory);
List<List<Object>> expected = new ArrayList<>();
for (int i = 0; i < block.getPositionCount(); i += 2) {
builder.copyFrom(block, i, i + 1);
switch (elementType) {
case BOOLEAN -> ((BooleanBlockBuilder) builder).copyFrom((BooleanBlock) block, i);
case BYTES_REF -> ((BytesRefBlockBuilder) builder).copyFrom((BytesRefBlock) block, i, new BytesRef());
case DOUBLE -> ((DoubleBlockBuilder) builder).copyFrom((DoubleBlock) block, i);
case FLOAT -> ((FloatBlockBuilder) builder).copyFrom((FloatBlock) block, i);
case INT -> ((IntBlockBuilder) builder).copyFrom((IntBlock) block, i);
case LONG -> ((LongBlockBuilder) builder).copyFrom((LongBlock) block, i);
default -> throw new IllegalArgumentException("unsupported type: " + elementType);
}
expected.add(valuesAtPositions(block, i, i + 1).get(0));
}
assertBlockValues(builder.build(), expected);

View File

@ -0,0 +1,225 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class CoalesceBooleanEvaluator implements EvalOperator.ExpressionEvaluator permits
CoalesceBooleanEvaluator.CoalesceBooleanEagerEvaluator, //
CoalesceBooleanEvaluator.CoalesceBooleanLazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceBooleanEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceBooleanEagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceBooleanLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceBooleanLazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected CoalesceBooleanEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final BooleanBlock eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private BooleanBlock entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
BooleanBlock lastFullBlock = (BooleanBlock) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link CoalesceBooleanLazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class CoalesceBooleanEagerEvaluator extends CoalesceBooleanEvaluator {
CoalesceBooleanEagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
BooleanBlock[] flatten = new BooleanBlock[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = (BooleanBlock) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try (BooleanBlock.Builder result = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for (BooleanBlock f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class CoalesceBooleanLazyEvaluator extends CoalesceBooleanEvaluator {
CoalesceBooleanLazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected BooleanBlock perPosition(Page page, BooleanBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
try (BooleanBlock.Builder result = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try (BooleanBlock block = (BooleanBlock) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -0,0 +1,228 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class CoalesceBytesRefEvaluator implements EvalOperator.ExpressionEvaluator permits
CoalesceBytesRefEvaluator.CoalesceBytesRefEagerEvaluator, //
CoalesceBytesRefEvaluator.CoalesceBytesRefLazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceBytesRefEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceBytesRefEagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceBytesRefLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceBytesRefLazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected CoalesceBytesRefEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final BytesRefBlock eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private BytesRefBlock entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
BytesRefBlock lastFullBlock = (BytesRefBlock) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link CoalesceBytesRefLazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class CoalesceBytesRefEagerEvaluator extends CoalesceBytesRefEvaluator {
CoalesceBytesRefEagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate) {
BytesRef scratch = new BytesRef();
int positionCount = page.getPositionCount();
BytesRefBlock[] flatten = new BytesRefBlock[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = (BytesRefBlock) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try (BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for (BytesRefBlock f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p, scratch);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class CoalesceBytesRefLazyEvaluator extends CoalesceBytesRefEvaluator {
CoalesceBytesRefLazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected BytesRefBlock perPosition(Page page, BytesRefBlock lastFullBlock, int firstToEvaluate) {
BytesRef scratch = new BytesRef();
int positionCount = page.getPositionCount();
try (BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try (BytesRefBlock block = (BytesRefBlock) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0, scratch);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -0,0 +1,225 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class CoalesceDoubleEvaluator implements EvalOperator.ExpressionEvaluator permits
CoalesceDoubleEvaluator.CoalesceDoubleEagerEvaluator, //
CoalesceDoubleEvaluator.CoalesceDoubleLazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceDoubleEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceDoubleEagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceDoubleLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceDoubleLazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected CoalesceDoubleEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final DoubleBlock eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private DoubleBlock entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
DoubleBlock lastFullBlock = (DoubleBlock) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link CoalesceDoubleLazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class CoalesceDoubleEagerEvaluator extends CoalesceDoubleEvaluator {
CoalesceDoubleEagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
DoubleBlock[] flatten = new DoubleBlock[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = (DoubleBlock) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try (DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for (DoubleBlock f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class CoalesceDoubleLazyEvaluator extends CoalesceDoubleEvaluator {
CoalesceDoubleLazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected DoubleBlock perPosition(Page page, DoubleBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
try (DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try (DoubleBlock block = (DoubleBlock) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -0,0 +1,225 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class CoalesceIntEvaluator implements EvalOperator.ExpressionEvaluator permits
CoalesceIntEvaluator.CoalesceIntEagerEvaluator, //
CoalesceIntEvaluator.CoalesceIntLazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceIntEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceIntEagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceIntLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceIntLazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected CoalesceIntEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final IntBlock eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private IntBlock entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
IntBlock lastFullBlock = (IntBlock) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link CoalesceIntLazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class CoalesceIntEagerEvaluator extends CoalesceIntEvaluator {
CoalesceIntEagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
IntBlock[] flatten = new IntBlock[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = (IntBlock) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try (IntBlock.Builder result = driverContext.blockFactory().newIntBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for (IntBlock f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class CoalesceIntLazyEvaluator extends CoalesceIntEvaluator {
CoalesceIntLazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected IntBlock perPosition(Page page, IntBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
try (IntBlock.Builder result = driverContext.blockFactory().newIntBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try (IntBlock block = (IntBlock) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -0,0 +1,225 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class CoalesceLongEvaluator implements EvalOperator.ExpressionEvaluator permits
CoalesceLongEvaluator.CoalesceLongEagerEvaluator, //
CoalesceLongEvaluator.CoalesceLongLazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceLongEagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceLongEagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceLongLazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "CoalesceLongLazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected CoalesceLongEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final LongBlock eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private LongBlock entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
LongBlock lastFullBlock = (LongBlock) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link CoalesceLongLazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class CoalesceLongEagerEvaluator extends CoalesceLongEvaluator {
CoalesceLongEagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
LongBlock[] flatten = new LongBlock[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = (LongBlock) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try (LongBlock.Builder result = driverContext.blockFactory().newLongBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for (LongBlock f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class CoalesceLongLazyEvaluator extends CoalesceLongEvaluator {
CoalesceLongLazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected LongBlock perPosition(Page page, LongBlock lastFullBlock, int firstToEvaluate) {
int positionCount = page.getPositionCount();
try (LongBlock.Builder result = driverContext.blockFactory().newLongBlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try (LongBlock block = (LongBlock) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -524,6 +524,9 @@ public final class Case extends EsqlScalarFunction {
) {
for (int p = 0; p < lhs.getPositionCount(); p++) {
if (lhsOrRhs.mask().getBoolean(p)) {
// TODO Copy the per-type specialization that COALESCE has.
// There's also a slowdown because copying from a block checks to see if there are any nulls and that's slow.
// Vectors do not, so this still shows as fairly fast. But not as fast as the per-type unrolling.
builder.copyFrom(lhs, p, p + 1);
} else {
builder.copyFrom(rhs, p, p + 1);

View File

@ -11,13 +11,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.Nullability;
@ -31,17 +26,29 @@ import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import java.io.IOException;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_SHAPE;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS;
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT;
import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_SHAPE;
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
import static org.elasticsearch.xpack.esql.core.type.DataType.IP;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.NULL;
import static org.elasticsearch.xpack.esql.core.type.DataType.VERSION;
/**
* Function returning the first non-null value.
* Function returning the first non-null value. {@code COALESCE} runs as though
* it were lazily evaluating each position in each incoming {@link Block}.
*/
public class Coalesce extends EsqlScalarFunction implements OptionalArgument {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Coalesce", Coalesce::new);
@ -194,70 +201,16 @@ public class Coalesce extends EsqlScalarFunction implements OptionalArgument {
@Override
public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
List<ExpressionEvaluator.Factory> childEvaluators = children().stream().map(toEvaluator::apply).toList();
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new CoalesceEvaluator(
context,
PlannerUtils.toElementType(dataType()),
childEvaluators.stream().map(x -> x.get(context)).toList()
);
}
@Override
public String toString() {
return "CoalesceEvaluator[values=" + childEvaluators + ']';
}
return switch (dataType()) {
case BOOLEAN -> CoalesceBooleanEvaluator.toEvaluator(toEvaluator, children());
case DOUBLE, COUNTER_DOUBLE -> CoalesceDoubleEvaluator.toEvaluator(toEvaluator, children());
case INTEGER, COUNTER_INTEGER -> CoalesceIntEvaluator.toEvaluator(toEvaluator, children());
case LONG, DATE_NANOS, DATETIME, COUNTER_LONG, UNSIGNED_LONG -> CoalesceLongEvaluator.toEvaluator(toEvaluator, children());
case KEYWORD, TEXT, SEMANTIC_TEXT, CARTESIAN_POINT, CARTESIAN_SHAPE, GEO_POINT, GEO_SHAPE, IP, VERSION ->
CoalesceBytesRefEvaluator.toEvaluator(toEvaluator, children());
case NULL -> EvalOperator.CONSTANT_NULL_FACTORY;
case UNSUPPORTED, SHORT, BYTE, DATE_PERIOD, OBJECT, DOC_DATA_TYPE, SOURCE, TIME_DURATION, FLOAT, HALF_FLOAT, TSID_DATA_TYPE,
SCALED_FLOAT, PARTIAL_AGG -> throw new UnsupportedOperationException(dataType() + " can't be coalesced");
};
}
private record CoalesceEvaluator(DriverContext driverContext, ElementType resultType, List<EvalOperator.ExpressionEvaluator> evaluators)
implements
EvalOperator.ExpressionEvaluator {
@Override
public Block eval(Page page) {
/*
* We have to evaluate lazily so any errors or warnings that would be
* produced by the right hand side are avoided. And so if anything
* on the right hand side is slow we skip it.
*
* And it'd be good if that lazy evaluation were fast. But this
* implementation isn't. It's fairly simple - running position at
* a time - but it's not at all fast.
*/
int positionCount = page.getPositionCount();
try (Block.Builder result = resultType.newBlockBuilder(positionCount, driverContext.blockFactory())) {
position: for (int p = 0; p < positionCount; p++) {
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (EvalOperator.ExpressionEvaluator eval : evaluators) {
try (Block block = eval.eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0, 1);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
}
}
@Override
public String toString() {
return "CoalesceEvaluator[values=" + evaluators + ']';
}
@Override
public void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
}
}

View File

@ -0,0 +1,234 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;
$if(BytesRef)$
import org.apache.lucene.util.BytesRef;
$endif$
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.$Type$Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
import java.util.List;
import java.util.stream.IntStream;
/**
* {@link EvalOperator.ExpressionEvaluator} implementation for {@link Coalesce}.
* This class is generated. Edit {@code X-InEvaluator.java.st} instead.
*/
abstract sealed class Coalesce$Type$Evaluator implements EvalOperator.ExpressionEvaluator permits
Coalesce$Type$Evaluator.Coalesce$Type$EagerEvaluator, //
Coalesce$Type$Evaluator.Coalesce$Type$LazyEvaluator {
static ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator, List<Expression> children) {
List<ExpressionEvaluator.Factory> childEvaluators = children.stream().map(toEvaluator::apply).toList();
if (childEvaluators.stream().allMatch(ExpressionEvaluator.Factory::eagerEvalSafeInLazy)) {
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new Coalesce$Type$EagerEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "Coalesce$Type$EagerEvaluator[values=" + childEvaluators + ']';
}
};
}
return new ExpressionEvaluator.Factory() {
@Override
public ExpressionEvaluator get(DriverContext context) {
return new Coalesce$Type$LazyEvaluator(context, childEvaluators.stream().map(x -> x.get(context)).toList());
}
@Override
public String toString() {
return "Coalesce$Type$LazyEvaluator[values=" + childEvaluators + ']';
}
};
}
protected final DriverContext driverContext;
protected final List<EvalOperator.ExpressionEvaluator> evaluators;
protected Coalesce$Type$Evaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
this.driverContext = driverContext;
this.evaluators = evaluators;
}
@Override
public final $Type$Block eval(Page page) {
return entireBlock(page);
}
/**
* Evaluate COALESCE for an entire {@link Block} for as long as we can, then shift to
* {@link #perPosition} evaluation.
* <p>
* Entire Block evaluation is the "normal" way to run the compute engine,
* just calling {@link EvalOperator.ExpressionEvaluator#eval}. It's much faster so we try
* that first. For each evaluator, we {@linkplain EvalOperator.ExpressionEvaluator#eval} and:
* </p>
* <ul>
* <li>If the {@linkplain Block} doesn't have any nulls we return it. COALESCE done.</li>
* <li>If the {@linkplain Block} is only nulls we skip it and try the next evaluator.</li>
* <li>If this is the last evaluator we just return it. COALESCE done.</li>
* <li>
* Otherwise, the {@linkplain Block} has mixed nulls and non-nulls so we drop
* into a per position evaluator.
* </li>
* </ul>
*/
private $Type$Block entireBlock(Page page) {
int lastFullBlockIdx = 0;
while (true) {
$Type$Block lastFullBlock = ($Type$Block) evaluators.get(lastFullBlockIdx++).eval(page);
if (lastFullBlockIdx == evaluators.size() || lastFullBlock.asVector() != null) {
return lastFullBlock;
}
if (lastFullBlock.areAllValuesNull()) {
// Result is all nulls and isn't the last result so we don't need any of it.
lastFullBlock.close();
continue;
}
// The result has some nulls and some non-nulls.
return perPosition(page, lastFullBlock, lastFullBlockIdx);
}
}
/**
* Evaluate each position of the incoming {@link Page} for COALESCE
* independently. Our attempt to evaluate entire blocks has yielded
* a block that contains some nulls and some non-nulls and we have
* to fill in the nulls with the results of calling the remaining
* evaluators.
* <p>
* This <strong>must not</strong> return warnings caused by
* evaluating positions for which a previous evaluator returned
* non-null. These are positions that, at least from the perspective
* of a compute engine user, don't <strong>have</strong> to be
* evaluated. Put another way, this must function as though
* {@code COALESCE} were per-position lazy. It can manage that
* any way it likes.
* </p>
*/
protected abstract $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate);
@Override
public final String toString() {
return getClass().getSimpleName() + "[values=" + evaluators + ']';
}
@Override
public final void close() {
Releasables.closeExpectNoException(() -> Releasables.close(evaluators));
}
/**
* Evaluates {@code COALESCE} eagerly per position if entire-block evaluation fails.
* First we evaluate all remaining evaluators, and then we pluck the first non-null
* value from each one. This is <strong>much</strong> faster than
* {@link Coalesce$Type$LazyEvaluator} but will include spurious warnings if any of the
* evaluators make them so we only use it for evaluators that are
* {@link Factory#eagerEvalSafeInLazy safe} to evaluate eagerly
* in a lazy environment.
*/
static final class Coalesce$Type$EagerEvaluator extends Coalesce$Type$Evaluator {
Coalesce$Type$EagerEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate) {
$if(BytesRef)$
BytesRef scratch = new BytesRef();
$endif$
int positionCount = page.getPositionCount();
$Type$Block[] flatten = new $Type$Block[evaluators.size() - firstToEvaluate + 1];
try {
flatten[0] = lastFullBlock;
for (int f = 1; f < flatten.length; f++) {
flatten[f] = ($Type$Block) evaluators.get(firstToEvaluate + f - 1).eval(page);
}
try ($Type$Block.Builder result = driverContext.blockFactory().new$Type$BlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
for ($Type$Block f : flatten) {
if (false == f.isNull(p)) {
result.copyFrom(f, p$if(BytesRef)$, scratch$endif$);
continue position;
}
}
result.appendNull();
}
return result.build();
}
} finally {
Releasables.close(flatten);
}
}
}
/**
* Evaluates {@code COALESCE} lazily per position if entire-block evaluation fails.
* For each position we either:
* <ul>
* <li>Take the non-null values from the {@code lastFullBlock}</li>
* <li>
* Evaluator the remaining evaluators one at a time, keeping
* the first non-null value.
* </li>
* </ul>
*/
static final class Coalesce$Type$LazyEvaluator extends Coalesce$Type$Evaluator {
Coalesce$Type$LazyEvaluator(DriverContext driverContext, List<EvalOperator.ExpressionEvaluator> evaluators) {
super(driverContext, evaluators);
}
@Override
protected $Type$Block perPosition(Page page, $Type$Block lastFullBlock, int firstToEvaluate) {
$if(BytesRef)$
BytesRef scratch = new BytesRef();
$endif$
int positionCount = page.getPositionCount();
try ($Type$Block.Builder result = driverContext.blockFactory().new$Type$BlockBuilder(positionCount)) {
position: for (int p = 0; p < positionCount; p++) {
if (lastFullBlock.isNull(p) == false) {
result.copyFrom(lastFullBlock, p, p + 1);
continue;
}
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
try (Releasable ignored = limited::releaseBlocks) {
for (int e = firstToEvaluate; e < evaluators.size(); e++) {
try ($Type$Block block = ($Type$Block) evaluators.get(e).eval(limited)) {
if (false == block.isNull(0)) {
result.copyFrom(block, 0$if(BytesRef)$, scratch$endif$);
continue position;
}
}
}
result.appendNull();
}
}
return result.build();
} finally {
lastFullBlock.close();
}
}
}
}

View File

@ -12,8 +12,13 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
@ -29,6 +34,7 @@ import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
import org.elasticsearch.xpack.esql.expression.function.scalar.VaragsTestCaseBuilder;
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesFunctionTestCase;
import org.elasticsearch.xpack.esql.planner.Layout;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
import java.time.ZonedDateTime;
@ -40,6 +46,9 @@ import java.util.function.Supplier;
import static org.elasticsearch.compute.data.BlockUtils.toJavaObject;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.randomLiteral;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
public class CoalesceTests extends AbstractScalarFunctionTestCase {
public CoalesceTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> testCaseSupplier) {
@ -49,7 +58,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
@ParametersFactory
public static Iterable<Object[]> parameters() {
List<TestCaseSupplier> noNullsSuppliers = new ArrayList<>();
VaragsTestCaseBuilder builder = new VaragsTestCaseBuilder(type -> "Coalesce");
VaragsTestCaseBuilder builder = new VaragsTestCaseBuilder(type -> "Coalesce" + type + "Eager");
builder.expectString(strings -> strings.filter(v -> v != null).findFirst());
builder.expectLong(longs -> longs.filter(v -> v != null).findFirst());
builder.expectInt(ints -> ints.filter(v -> v != null).findFirst());
@ -64,7 +73,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
new TestCaseSupplier.TypedData(first, DataType.IP, "first"),
new TestCaseSupplier.TypedData(second, DataType.IP, "second")
),
"CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
"CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
DataType.IP,
equalTo(first == null ? second : first)
);
@ -79,7 +88,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
new TestCaseSupplier.TypedData(first, DataType.VERSION, "first"),
new TestCaseSupplier.TypedData(second, DataType.VERSION, "second")
),
"CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
"CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
DataType.VERSION,
equalTo(first == null ? second : first)
);
@ -92,7 +101,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
new TestCaseSupplier.TypedData(firstDate, DataType.DATETIME, "first"),
new TestCaseSupplier.TypedData(secondDate, DataType.DATETIME, "second")
),
"CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
"CoalesceLongEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
DataType.DATETIME,
equalTo(firstDate == null ? secondDate : firstDate)
);
@ -105,7 +114,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
new TestCaseSupplier.TypedData(firstDate, DataType.DATE_NANOS, "first"),
new TestCaseSupplier.TypedData(secondDate, DataType.DATE_NANOS, "second")
),
"CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
"CoalesceLongEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]",
DataType.DATE_NANOS,
equalTo(firstDate == null ? secondDate : firstDate)
);
@ -129,6 +138,20 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
suppliers.add(new TestCaseSupplier(nullCaseName(s, nullUpTo, true), types, () -> nullCase(s.get(), finalNullUpTo, true)));
}
}
suppliers.add(
new TestCaseSupplier(
List.of(DataType.NULL, DataType.NULL),
() -> new TestCaseSupplier.TestCase(
List.of(
new TestCaseSupplier.TypedData(null, DataType.NULL, "first"),
new TestCaseSupplier.TypedData(null, DataType.NULL, "second")
),
"ConstantNull",
DataType.NULL,
nullValue()
)
)
);
return parameterSuppliersFromTypedData(suppliers);
}
@ -167,7 +190,7 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
TestCaseSupplier.testCaseSupplier(
leftDataSupplier,
rightDataSupplier,
(l, r) -> equalTo("CoalesceEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]"),
(l, r) -> equalTo("CoalesceBytesRefEagerEvaluator[values=[Attribute[channel=0], Attribute[channel=1]]]"),
dataType,
(l, r) -> l
)
@ -235,6 +258,69 @@ public class CoalesceTests extends AbstractScalarFunctionTestCase {
sub.add(between(0, sub.size()), randomLiteral(sub.get(sub.size() - 1).dataType()));
Coalesce exp = build(Source.EMPTY, sub);
// Known not to be nullable because it contains a non-null literal
assertThat(exp.nullable(), equalTo(Nullability.FALSE));
if (testCase.expectedType() == DataType.NULL) {
assertThat(exp.nullable(), equalTo(Nullability.UNKNOWN));
} else {
assertThat(exp.nullable(), equalTo(Nullability.FALSE));
}
}
/**
* Inserts random non-null garbage <strong>around</strong> the expected data and runs COALESCE.
* <p>
* This is important for catching the case where your value is null, but the rest of the block
* isn't null. An off-by-one error in the evaluators can break this in a way that the standard
* tests weren't catching and this does.
* </p>
*/
public void testEvaluateWithGarbage() {
DriverContext context = driverContext();
Expression expression = randomBoolean() ? buildDeepCopyOfFieldExpression(testCase) : buildFieldExpression(testCase);
int positions = between(2, 1024);
List<TestCaseSupplier.TypedData> data = testCase.getData();
Page onePositionPage = row(testCase.getDataValues());
Block[] blocks = new Block[Math.toIntExact(data.stream().filter(d -> d.isForceLiteral() == false).count())];
int realPosition = between(0, positions - 1);
try {
int blocksIndex = 0;
for (TestCaseSupplier.TypedData d : data) {
blocks[blocksIndex] = blockWithRandomGarbage(
context.blockFactory(),
d.type(),
onePositionPage.getBlock(blocksIndex),
positions,
realPosition
);
blocksIndex++;
}
try (
EvalOperator.ExpressionEvaluator eval = evaluator(expression).get(context);
Block block = eval.eval(new Page(positions, blocks))
) {
assertThat(block.getPositionCount(), is(positions));
assertThat(toJavaObjectUnsignedLongAware(block, realPosition), testCase.getMatcher());
assertThat("evaluates to tracked block", block.blockFactory(), sameInstance(context.blockFactory()));
}
} finally {
Releasables.close(onePositionPage::releaseBlocks, Releasables.wrap(blocks));
}
}
private Block blockWithRandomGarbage(
BlockFactory blockFactory,
DataType type,
Block singlePositionBlock,
int totalPositions,
int insertLocation
) {
try (Block.Builder builder = PlannerUtils.toElementType(type).newBlockBuilder(totalPositions, blockFactory)) {
for (int p = 0; p < totalPositions; p++) {
Block copyFrom = p == insertLocation
? singlePositionBlock
: BlockUtils.constantBlock(TestBlockFactory.getNonBreakingInstance(), randomLiteral(type).value(), 1);
builder.copyFrom(copyFrom, 0, 1);
}
return builder.build();
}
}
}