Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.BiRel;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.Uncollect;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFamily;
Expand Down Expand Up @@ -681,25 +686,162 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

private static final String REVERSE_ROW_NUM = "__reverse_row_num__";
/**
* Backtrack through the RelNode tree to find the first Sort node with non-empty collation. Stops
* at blocking operators that break ordering:
*
* <ul>
* <li>Aggregate - aggregation destroys input ordering
* <li>BiRel - covers Join, Correlate, and other binary relations
* <li>SetOp - covers Union, Intersect, Except
* <li>Uncollect - unnesting operation that may change ordering
* <li>Project with window functions (RexOver) - ordering determined by window's ORDER BY
* </ul>
*
* @param node the starting RelNode to backtrack from
* @return the collation found, or null if no sort or blocking operator encountered
*/
private RelCollation backtrackForCollation(RelNode node) {
while (node != null) {
// Check for blocking operators that destroy collation
// BiRel covers Join, Correlate, and other binary relations
// SetOp covers Union, Intersect, Except
// Uncollect unnests arrays/multisets which may change ordering
if (node instanceof Aggregate
|| node instanceof BiRel
|| node instanceof SetOp
|| node instanceof Uncollect) {
return null;
}

// Project with window functions has ordering determined by the window's ORDER BY clause
// We should not destroy its output order by inserting a reversed sort
if (node instanceof LogicalProject && ((LogicalProject) node).containsOver()) {
return null;
}

// Check for Sort node with collation
if (node instanceof org.apache.calcite.rel.core.Sort) {
org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) node;
if (sort.getCollation() != null && !sort.getCollation().getFieldCollations().isEmpty()) {
return sort.getCollation();
}
}

// Continue to child node
if (node.getInputs().isEmpty()) {
break;
}
node = node.getInput(0);
}
return null;
}

/**
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
* with the reversed sort inserted right after the original sort.
*
* @param root the root of the tree to rebuild
* @param reversedCollation the reversed collation to insert
* @param context the Calcite plan context
* @return the rebuilt tree with reversed sort inserted
*/
private RelNode insertReversedSortInTree(
RelNode root, RelCollation reversedCollation, CalcitePlanContext context) {
return root.accept(
new org.apache.calcite.rel.RelHomogeneousShuttle() {
boolean sortFound = false;

@Override
public RelNode visit(RelNode other) {
if (!sortFound && other instanceof org.apache.calcite.rel.core.Sort) {
org.apache.calcite.rel.core.Sort sort = (org.apache.calcite.rel.core.Sort) other;
// Treat a Sort with fetch or offset as a barrier (limit node).
// Place the reversed sort above the barrier to preserve limit semantics,
// rather than inserting below the downstream collation Sort.
if (sort.fetch != null || sort.offset != null) {
sortFound = true;
RelNode visitedBarrier = super.visit(other);
return org.apache.calcite.rel.logical.LogicalSort.create(
visitedBarrier, reversedCollation, null, null);
}
// Found a collation Sort - insert reversed sort on top of it
if (sort.getCollation() != null
&& !sort.getCollation().getFieldCollations().isEmpty()) {
sortFound = true;
RelNode visitedSort = super.visit(other);
return org.apache.calcite.rel.logical.LogicalSort.create(
visitedSort, reversedCollation, null, null);
}
}
// For all other nodes, continue traversal
return super.visit(other);
}
});
}

@Override
public RelNode visitReverse(
org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) {
visitChildren(node, context);
// Add ROW_NUMBER() column
RexNode rowNumber =
context
.relBuilder
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
.over()
.rowsTo(RexWindowBounds.CURRENT_ROW)
.as(REVERSE_ROW_NUM);
context.relBuilder.projectPlus(rowNumber);
// Sort by row number descending
context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM)));
// Remove row number column
context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM));

// Check if there's an existing sort to reverse
List<RelCollation> collations =
context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek());
RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null;

if (collation != null && !collation.getFieldCollations().isEmpty()) {
// If there's an existing sort, reverse its direction
RelCollation reversedCollation = PlanUtils.reverseCollation(collation);
RelNode currentNode = context.relBuilder.peek();
if (currentNode instanceof org.apache.calcite.rel.core.Sort) {
org.apache.calcite.rel.core.Sort existingSort =
(org.apache.calcite.rel.core.Sort) currentNode;
if (existingSort.getCollation() != null
&& !existingSort.getCollation().getFieldCollations().isEmpty()
&& existingSort.fetch == null
&& existingSort.offset == null) {
// Pure collation sort (no fetch/offset) - replace in-place to avoid consecutive
// sorts. Calcite's physical optimizer merges consecutive LogicalSort nodes and may
// discard the reversed direction. Replacing in-place avoids this issue.
RelCollation reversedFromSort = PlanUtils.reverseCollation(existingSort.getCollation());
RelNode replacedSort =
org.apache.calcite.rel.logical.LogicalSort.create(
existingSort.getInput(), reversedFromSort, null, null);
PlanUtils.replaceTop(context.relBuilder, replacedSort);
} else {
// Sort with fetch/offset (limit) or fetch-only Sort - add a separate reversed
// sort on top so the "limit then reverse" semantics are preserved.
context.relBuilder.sort(reversedCollation);
}
} else {
context.relBuilder.sort(reversedCollation);
}
} else {
// Collation not found on current node - try backtracking
RelNode currentNode = context.relBuilder.peek();
RelCollation backtrackCollation = backtrackForCollation(currentNode);

if (backtrackCollation != null && !backtrackCollation.getFieldCollations().isEmpty()) {
// Found collation through backtracking - rebuild tree with reversed sort
RelCollation reversedCollation = PlanUtils.reverseCollation(backtrackCollation);
RelNode rebuiltTree = insertReversedSortInTree(currentNode, reversedCollation, context);
// Replace the current node in the builder with the rebuilt tree
context.relBuilder.build(); // Pop the current node
context.relBuilder.push(rebuiltTree); // Push the rebuilt tree
} else {
// Check if @timestamp field exists in the row type
List<String> fieldNames = context.relBuilder.peek().getRowType().getFieldNames();
if (fieldNames.contains(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)) {
// If @timestamp exists, sort by it in descending order
context.relBuilder.sort(
context.relBuilder.desc(
context.relBuilder.field(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)));
}
// If neither collation nor @timestamp exists, ignore the reverse command (no-op)
}
}

return context.relBuilder.peek();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
Expand Down Expand Up @@ -593,6 +596,37 @@ public Void visitCorrelVariable(RexCorrelVariable correlVar) {
}
}

/**
* Reverses the direction of a RelCollation.
*
* @param original The original collation to reverse
* @return A new RelCollation with reversed directions
*/
public static RelCollation reverseCollation(RelCollation original) {
if (original == null || original.getFieldCollations().isEmpty()) {
return original;
}

List<RelFieldCollation> reversedFields = new ArrayList<>();
for (RelFieldCollation field : original.getFieldCollations()) {
RelFieldCollation.Direction reversedDirection = field.direction.reverse();

// Handle null direction properly - reverse it as well
RelFieldCollation.NullDirection reversedNullDirection =
field.nullDirection == RelFieldCollation.NullDirection.FIRST
? RelFieldCollation.NullDirection.LAST
: field.nullDirection == RelFieldCollation.NullDirection.LAST
? RelFieldCollation.NullDirection.FIRST
: field.nullDirection;

RelFieldCollation reversedField =
new RelFieldCollation(field.getFieldIndex(), reversedDirection, reversedNullDirection);
reversedFields.add(reversedField);
}

return RelCollations.of(reversedFields);
}

/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
try {
Expand Down
Loading
Loading