diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index ed7a05027e8b..5b39147f3e29 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -73,6 +73,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException table.deleteWithBatch(batch, key); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + table.deleteRangeWithBatch(batch, beginKey, endKey); + } + @Override public final KeyValueIterator iterator(KEY prefix, IteratorType type) { throw new UnsupportedOperationException("Iterating tables directly is not" + diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java index 513c732d3022..3475d5ae99b0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java @@ -21,13 +21,19 @@ import com.google.common.base.Preconditions; import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; @@ -84,7 +90,7 @@ private static String countSize2String(int count, long size) { * To implement {@link #equals(Object)} and {@link #hashCode()} * based on the contents of the bytes. */ - static final class Bytes implements Closeable { + static final class Bytes implements Comparable, Closeable { private final AbstractSlice slice; /** Cache the hash value. */ private final int hash; @@ -130,6 +136,12 @@ public String toString() { return slice.toString(); } + // This method mimics the ByteWiseComparator in RocksDB. + @Override + public int compareTo(RDBBatchOperation.Bytes that) { + return this.slice.compare(that.slice); + } + @Override public void close() { slice.close(); @@ -239,6 +251,40 @@ boolean closeImpl() { } } + /** + * Delete range operation to be applied to a {@link ColumnFamily} batch. + */ + private final class DeleteRangeOperation extends Op { + private final byte[] startKey; + private final byte[] endKey; + private final RangeQueryIndex.Range rangeEntry; + + private DeleteRangeOperation(byte[] startKey, byte[] endKey) { + this.startKey = Objects.requireNonNull(startKey, "startKey == null"); + this.endKey = Objects.requireNonNull(endKey, "endKey == null"); + this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey)); + } + + @Override + public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { + family.batchDeleteRange(batch, startKey, endKey); + } + + @Override + int totalLength() { + return startKey.length + endKey.length; + } + + @Override + boolean closeImpl() { + if (super.closeImpl()) { + IOUtils.close(LOG, rangeEntry.getStartInclusive(), rangeEntry.getEndExclusive()); + return true; + } + return false; + } + } + /** Cache and deduplicate db ops (put/delete). */ private class OpCache { /** A (family name -> {@link FamilyCache}) map. */ @@ -249,16 +295,40 @@ private class FamilyCache { private final ColumnFamily family; /** - * A mapping of keys to operations for batch processing in the {@link FamilyCache}. - * The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer - * for efficient equality and hashing. The values are instances of {@link Op}, representing - * different types of operations that can be applied to a {@link ColumnFamily}. + * A mapping of operation keys to their respective indices in {@code FamilyCache}. + * + * Key details: + * - Maintains a mapping of unique operation keys to their insertion or processing order. + * - Used internally to manage and sort operations during batch writes. + * - Facilitates filtering, overwriting, or deletion of operations based on their keys. + * + * Constraints: + * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. + * - Each key is associated with a unique integer index to track insertion order. * - * This field is intended to store pending batch updates before they are written to the database. - * It supports operations such as additions and deletions while maintaining the ability to overwrite - * existing entries when necessary. + * This field plays a critical role in managing the logical consistency and proper execution + * order of operations stored in the batch when interacting with a RocksDB-backed system. */ - private final Map ops = new HashMap<>(); + private final Map singleOpKeys = new HashMap<>(); + /** + * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. + * + * This map serves as the primary container for recording operations in preparation for a batch write + * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines + * its insertion order and ensures correct sequencing during batch execution. + * + * Key characteristics: + * - Stores operations of type {@code Operation}. + * - Uses a unique integer key (index) for mapping each operation. + * - Serves as an intermediary structure during batch preparation and execution. + * + * Usage context: + * - This map is managed as part of the batch-writing process, which involves organizing, + * filtering, and applying multiple operations in a single cohesive batch. + * - Operations stored in this map are expected to define specific actions (e.g., put, delete, + * delete range) and their associated data (e.g., keys, values). + */ + private final Map ops = new HashMap<>(); private boolean isCommit; private long batchSize; @@ -266,19 +336,82 @@ private class FamilyCache { private int discardedCount; private int putCount; private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; FamilyCache(ColumnFamily family) { this.family = family; + this.opIndex = new AtomicInteger(0); } - /** Prepare batch write for the entire family. */ + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Op op : ops.values()) { - op.apply(family, writeBatch); + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List opList = ops.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + Set> deleteRangeEntries = new HashSet<>(); + for (Op op : opList) { + if (op instanceof DeleteRangeOperation) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + deleteRangeEntries.add(deleteRangeOp.rangeEntry); + } + } + try { + RangeQueryIndex rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries); + for (Op op : opList) { + if (op instanceof DeleteRangeOperation) { + DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; + rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry); + op.apply(family, writeBatch); + } else { + // Find a delete range op matching which would contain the key after the + // operation has occurred. If there is no such operation then perform the operation otherwise discard the + // op. + SingleKeyOp singleKeyOp = (SingleKeyOp) op; + if (!rangeQueryIdx.containsIntersectingRange(singleKeyOp.getKeyBytes())) { + op.apply(family, writeBatch); + } else { + debug(() -> { + RangeQueryIndex.Range deleteRangeOp = + rangeQueryIdx.getFirstIntersectingRange(singleKeyOp.getKeyBytes()); + return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", + singleKeyOp.getKeyBytes(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive()); + }); + discardedCount++; + discardedSize += op.totalLength(); + } + } + } + debug(this::summary); + } catch (IOException e) { + throw new RocksDatabaseException("Failed to prepare batch write", e); } - debug(this::summary); } private String summary() { @@ -299,8 +432,10 @@ void clear() { } private void deleteIfExist(Bytes key) { - final SingleKeyOp previous = ops.remove(key); - if (previous != null) { + // remove previous first in order to call release() + Integer previousIndex = singleOpKeys.remove(key); + if (previousIndex != null) { + final SingleKeyOp previous = (SingleKeyOp) ops.remove(previousIndex); previous.close(); discardedSize += previous.totalLength(); discardedCount++; @@ -314,9 +449,10 @@ void overwriteIfExists(SingleKeyOp op) { Bytes key = op.getKeyBytes(); deleteIfExist(key); batchSize += op.totalLength(); - Op overwritten = ops.put(key, op); - Preconditions.checkState(overwritten == null); - + int newIndex = opIndex.getAndIncrement(); + final Integer overwrittenOpKey = singleOpKeys.put(key, newIndex); + final Op overwrittenOp = ops.put(newIndex, op); + Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null); debug(() -> String.format("%s %s, %s; key=%s", this, op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()), batchSizeDiscardedString(), key)); @@ -332,6 +468,11 @@ void delete(CodecBuffer key) { overwriteIfExists(new DeleteOp(key)); } + void deleteRange(byte[] startKey, byte[] endKey) { + delRangeCount++; + ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); + } + String putString(int keySize, int valueSize) { return String.format("put(key: %s, value: %s), #put=%s", byteSize2String(keySize), byteSize2String(valueSize), putCount); @@ -361,6 +502,11 @@ void delete(ColumnFamily family, CodecBuffer key) { name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); } + void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) + .deleteRange(startKey, endKey); + } + /** Prepare batch write for the entire cache. */ UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { for (Map.Entry e : name2cache.entrySet()) { @@ -382,6 +528,7 @@ String getCommitString() { int opSize = 0; int discardedCount = 0; int discardedSize = 0; + int delRangeCount = 0; for (FamilyCache f : name2cache.values()) { putCount += f.putCount; @@ -389,12 +536,13 @@ String getCommitString() { opSize += f.batchSize; discardedCount += f.discardedCount; discardedSize += f.discardedSize; + delRangeCount += f.delRangeCount; } final int opCount = putCount + delCount; return String.format( - "#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s", - putCount, delCount, + "#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s", + putCount, delCount, delRangeCount, countSize2String(opCount, opSize), countSize2String(discardedCount, discardedSize), countSize2String(opCount - discardedCount, opSize - discardedSize)); @@ -449,4 +597,8 @@ public void put(ColumnFamily family, byte[] key, byte[] value) { opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key), DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value)); } + + public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) { + opCache.deleteRange(family, startKey, endKey); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 2aef5daa3c90..ec9d900a1d47 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -211,6 +211,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) { } + @Override + public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) { + if (batch instanceof RDBBatchOperation) { + ((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey); + } else { + throw new IllegalArgumentException("batch should be RDBBatchOperation"); + } + } + @Override public KeyValueIterator iterator(byte[] prefix, IteratorType type) throws RocksDatabaseException { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java new file mode 100644 index 000000000000..d140794f347a --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RangeQueryIndex.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; + +/** + * An index for answering "does this point fall within any of these ranges?" efficiently. + * + *

The indexed ranges are half-open intervals of the form + * {@code [startInclusive, endExclusive)}. + * + *

Core idea (sweep-line / prefix-sum over range boundaries): + * Instead of scanning every range on each query, this index stores a sorted map from + * boundary points to a running count of "active" ranges at that point. + * + *

    + *
  • For each range {@code [s, e)}, we add a delta {@code +1} at {@code s} and a delta + * {@code -1} at {@code e}.
  • + *
  • We then convert the deltas into a prefix sum in key order, so every boundary key + * stores the number of ranges active at that coordinate.
  • + *
  • For any query point {@code k}, the active count is {@code floorEntry(k).value}. + * If it is {@code > 0}, then {@code k} intersects at least one range.
  • + *
+ * + *

Update model: this index supports only removing ranges that were part of the + * initial set. Removal updates the prefix sums for keys in {@code [startInclusive, endExclusive)} + * (net effect of removing {@code +1} at start and {@code -1} at end). + * + *

Complexities: + *

    + *
  • Build: {@code O(R log B)} where {@code R} is #ranges and {@code B} is #distinct boundaries.
  • + *
  • {@link #containsIntersectingRange(Comparable)} (Object)}: {@code O(log B)}.
  • + *
  • {@link #removeRange(Range)}: {@code O(log B + K)} where {@code K} is #boundaries in the range.
  • + *
+ * + * @param boundary type (must be {@link Comparable} to be stored in a {@link TreeMap}) + */ +class RangeQueryIndex> { + + private final TreeMap rangeCountIndexMap; + private final Set> ranges; + + RangeQueryIndex(Set> ranges) { + this.rangeCountIndexMap = new TreeMap<>(); + this.ranges = ranges; + init(); + } + + private void init() { + // Phase 1: store boundary deltas (+1 at start, -1 at end). + for (Range range : ranges) { + rangeCountIndexMap.compute(range.startInclusive, (k, v) -> v == null ? 1 : v + 1); + rangeCountIndexMap.compute(range.endExclusive, (k, v) -> v == null ? -1 : v - 1); + } + + // Phase 2: convert deltas to prefix sums so each key holds the active range count at that coordinate. + int totalCount = 0; + for (Map.Entry entry : rangeCountIndexMap.entrySet()) { + totalCount += entry.getValue(); + entry.setValue(totalCount); + } + } + + /** + * Remove a range from the index. + * + *

This method assumes the range set is "popped" over time (ranges are removed but not added). + * Internally, removing {@code [s, e)} decreases the active count by 1 for all boundary keys in + * {@code [s, e)} and leaves counts outside the range unchanged. + * + * @throws IOException if the given {@code range} is not part of the indexed set + */ + void removeRange(Range range) throws IOException { + if (!ranges.contains(range)) { + throw new IOException(String.format("Range %s not found in index structure : %s", range, ranges)); + } + ranges.remove(range); + for (Map.Entry entry : rangeCountIndexMap.subMap(range.startInclusive, true, + range.endExclusive, false).entrySet()) { + entry.setValue(entry.getValue() - 1); + } + } + + /** + * @return true iff {@code key} is contained in at least one indexed range. + * + *

Implementation detail: uses {@link TreeMap#floorEntry(Object)} to find the last boundary + * at or before {@code key}, and checks the prefix-summed active count at that point.

+ */ + boolean containsIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return false; + } + return countEntry.getValue() > 0; + } + + /** + * Returns an intersecting range containing {@code key}, if any. + * + *

This method first checks {@link #containsIntersectingRange(Comparable)} using the index; + * if the count indicates an intersection exists, it then scans the backing {@link #ranges} + * set to find a concrete {@link Range} that contains {@code key}.

+ * + *

Note that because {@link #ranges} is a {@link Set}, "first" refers to whatever iteration + * order that set provides (it is not guaranteed to be deterministic unless the provided set is).

+ * + * @return a containing range, or null if none intersect + */ + Range getFirstIntersectingRange(T key) { + Map.Entry countEntry = rangeCountIndexMap.floorEntry(key); + if (countEntry == null) { + return null; + } + for (Range range : ranges) { + if (range.contains(key)) { + return range; + } + } + return null; + } + + /** + * A half-open interval {@code [startInclusive, endExclusive)}. + * + *

For a value {@code k} to be contained, it must satisfy: + * {@code startInclusive <= k < endExclusive} (according to {@link Comparable#compareTo(Object)}).

+ */ + static final class Range> { + private final T startInclusive; + private final T endExclusive; + + Range(T startInclusive, T endExclusive) { + this.startInclusive = Objects.requireNonNull(startInclusive, "start == null"); + this.endExclusive = Objects.requireNonNull(endExclusive, "end == null"); + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return Objects.hash(startInclusive, endExclusive); + } + + T getStartInclusive() { + return startInclusive; + } + + T getEndExclusive() { + return endExclusive; + } + + /** + * @return true iff {@code key} is within {@code [startInclusive, endExclusive)}. + */ + public boolean contains(T key) { + return startInclusive.compareTo(key) <= 0 && key.compareTo(endExclusive) < 0; + } + + @Override + public String toString() { + return "Range{" + + "startInclusive=" + startInclusive + + ", endExclusive=" + endExclusive + + '}'; + } + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java index 5aff93518044..621178f687d5 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java @@ -308,6 +308,16 @@ public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key) } } + public void batchDeleteRange(ManagedWriteBatch writeBatch, byte[] beginKey, byte[] endKey) + throws RocksDatabaseException { + try (UncheckedAutoCloseable ignored = acquire()) { + writeBatch.deleteRange(getHandle(), beginKey, endKey); + } catch (RocksDBException e) { + throw toRocksDatabaseException(this, "batchDeleteRange key " + bytes2String(beginKey) + " - " + + bytes2String(endKey), e); + } + } + public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key, ByteBuffer value) throws RocksDatabaseException { if (LOG.isDebugEnabled()) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index fc0490344062..6904f22d7d8c 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -134,6 +134,14 @@ default VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException */ void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException; + /** + * Deletes a range of keys from the metadata store as part of a batch operation. + * @param batch Batch operation to perform the delete operation. + * @param beginKey start metadata key, inclusive. + * @param endKey end metadata key, exclusive. + */ + void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException; + /** * Deletes a range of keys from the metadata store. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 59e924529ce4..bd0f6321b5bc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -392,6 +392,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException } } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); + } + @Override public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException { rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey)); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 1dbb5029713a..7f2ce3fc3a5b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -90,6 +90,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) { throw new UnsupportedOperationException(); } + @Override + public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) { + throw new UnsupportedOperationException(); + } + @Override public void deleteRange(KEY beginKey, KEY endKey) { map.subMap(beginKey, endKey).clear(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java index aee70feaceb6..4fb1b8fa0349 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBBatchOperation.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils.db; +import static com.google.common.primitives.UnsignedBytes.lexicographicalComparator; import static org.apache.hadoop.hdds.StringUtils.string2Bytes; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -24,17 +25,23 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.db.Table.KeyValue; @@ -51,10 +58,19 @@ import org.rocksdb.RocksDBException; /** - * The TestRDBBatchOperation class provides test cases to validate the functionality of RDB batch operations - * in a RocksDB-based backend. It verifies the correct behavior of write operations using batch processing - * and ensures the integrity of operations like put and delete when performed in batch mode. - */ + * Test class for verifying batch operations with delete ranges using the + * RDBBatchOperation and MockedConstruction of ManagedWriteBatch. + * + * This test class includes: + * - Mocking and tracking of operations including put, delete, and delete range + * within a batch operation. + * - Validation of committed operations using assertions on collected data. + * - Ensures that the batch operation interacts correctly with the + * RocksDatabase and ColumnFamilyHandle components. + * + * The test method includes: + * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily. + * 2. Mocking of methods to track operations performed on*/ public class TestRDBBatchOperation { static { @@ -69,9 +85,9 @@ private static Operation getOperation(String key, String value, OpType opType) { } @Test - public void testBatchOperation() throws RocksDatabaseException, CodecException, RocksDBException { + public void testBatchOperationWithDeleteRange() throws RocksDatabaseException, CodecException, RocksDBException { try (TrackingUtilManagedWriteBatchForTesting writeBatch = new TrackingUtilManagedWriteBatchForTesting(); - RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { + RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); doAnswer((i) -> { @@ -80,6 +96,12 @@ public void testBatchOperation() throws RocksDatabaseException, CodecException, return null; }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); + doAnswer((i) -> { + ((ManagedWriteBatch)i.getArgument(0)) + .deleteRange(columnFamilyHandle, (byte[]) i.getArgument(1), (byte[]) i.getArgument(2)); + return null; + }).when(columnFamily).batchDeleteRange(any(ManagedWriteBatch.class), any(byte[].class), any(byte[].class)); + doAnswer((i) -> { ((ManagedWriteBatch)i.getArgument(0)) .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); @@ -90,39 +112,43 @@ public void testBatchOperation() throws RocksDatabaseException, CodecException, when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test")); when(columnFamily.getName()).thenReturn("test"); Codec codec = StringCodec.get(); - // OP1: This should be skipped in favor of OP9. + // OP1 should be skipped because of OP7 batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); - // OP2 + // OP2 should be skipped because of OP8 batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); - // OP3: This should be skipped in favor of OP4. + // OP3 batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); - // OP4 + // OP4 would overwrite OP3 batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); // OP5 batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); - // OP6 + // OP6 : This delete operation should get skipped because of OP11 batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); // OP7 - batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key01"), codec.toPersistedFormat("key02")); // OP8 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("key03")); + // OP9 + batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); + // OP10 should be skipped because of OP11 batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); - //OP9 - batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value011")); - + // OP11 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("key12")); + // OP12 + batchOperation.deleteRange(columnFamily, codec.toPersistedFormat("key09"), codec.toPersistedFormat("key10")); RocksDatabase db = Mockito.mock(RocksDatabase.class); doNothing().when(db).batchWrite(any()); batchOperation.commit(db); - Set expectedOps = ImmutableSet.of( - getOperation("key01", "value011", OpType.PUT_DIRECT), - getOperation("key02", "value02", OpType.PUT_DIRECT), + List expectedOps = ImmutableList.of( getOperation("key03", "value04", OpType.PUT_DIRECT), getOperation("key05", null, OpType.DELETE_DIRECT), - getOperation("key10", null, OpType.DELETE_DIRECT), + getOperation("key01", "key02", OpType.DELETE_RANGE_INDIRECT), + getOperation("key02", "key03", OpType.DELETE_RANGE_INDIRECT), getOperation("key04", "value04", OpType.PUT_DIRECT), - getOperation("key06", "value05", OpType.PUT_DIRECT)); - assertEquals(Collections.singleton("test"), writeBatch.getOperations().keySet()); - assertEquals(expectedOps, new HashSet<>(writeBatch.getOperations().get("test"))); + getOperation("key06", "key12", OpType.DELETE_RANGE_INDIRECT), + getOperation("key09", "key10", OpType.DELETE_RANGE_INDIRECT)); + assertEquals(ImmutableMap.of("test", expectedOps), writeBatch.getOperations()); } } @@ -144,25 +170,38 @@ private void performDelete(Table withBatchTable, BatchOperation withoutBatchTable.delete(key); } + private void performDeleteRange(Table withBatchTable, BatchOperation batchOperation, + Table withoutBatchTable, String startKey, String endKey) + throws RocksDatabaseException, CodecException { + withBatchTable.deleteRangeWithBatch(batchOperation, startKey, endKey); + withoutBatchTable.deleteRange(startKey, endKey); + } + private String getRandomString() { int length = ThreadLocalRandom.current().nextInt(1, 1024); return RandomStringUtils.insecure().next(length); } - private void performOpWithRandomKey(CheckedConsumer op, Set keySet, - List keyList) throws IOException { - String key = getRandomString(); - op.accept(key); - if (!keySet.contains(key)) { - keyList.add(key); - keySet.add(key); + private void performOpWithRandomKey(CheckedConsumer, IOException> op, Set keySet, + List keyList, int numberOfKeys) throws IOException { + List randomKeys = new ArrayList<>(numberOfKeys); + for (int i = 0; i < numberOfKeys; i++) { + randomKeys.add(getRandomString()); + } + op.accept(randomKeys); + for (String key : randomKeys) { + if (!keySet.contains(key)) { + keyList.add(key); + keySet.add(key); + } } } - private void performOpWithRandomPreExistingKey(CheckedConsumer op, List keyList) - throws IOException { - int randomIndex = ThreadLocalRandom.current().nextInt(0, keyList.size()); - op.accept(keyList.get(randomIndex)); + private void performOpWithRandomPreExistingKey(CheckedConsumer, IOException> op, List keyList, + int numberOfKeys) throws IOException { + op.accept(IntStream.range(0, numberOfKeys) + .mapToObj(i -> keyList.get(ThreadLocalRandom.current().nextInt(0, keyList.size()))) + .collect(Collectors.toList())); } @Test @@ -178,16 +217,34 @@ public void testRDBBatchOperationWithRDB() throws IOException { StringCodec.get(), StringCodec.get()); List keyList = new ArrayList<>(); Set keySet = new HashSet<>(); - List> ops = Arrays.asList( - (key) -> performPut(withBatchTable, batchOperation, withoutBatchTable, key), - (key) -> performDelete(withBatchTable, batchOperation, withoutBatchTable, key)); + NavigableMap opProbMap = new TreeMap<>(); + // Have a probablity map to run delete range only 2% of the times. + // If there are too many delete range ops at once the table iteration can become very slow for + // randomised operations. + opProbMap.put(0.49, 0); + opProbMap.put(0.98, 1); + opProbMap.put(1.0, 2); + + Map opIdxToNumKeyMap = ImmutableMap.of(0, 1, 1, 1, 2, 2); + List, IOException>> ops = Arrays.asList( + (key) -> performPut(withBatchTable, batchOperation, withoutBatchTable, key.get(0)), + (key) -> performDelete(withBatchTable, batchOperation, withoutBatchTable, key.get(0)), + (key) -> { + key.sort((key1, key2) -> lexicographicalComparator().compare(string2Bytes(key1), + string2Bytes(key2))); + performDeleteRange(withBatchTable, batchOperation, withoutBatchTable, key.get(0), key.get(1)); + }); + Map cntMap = new HashMap<>(); for (int i = 0; i < 30000; i++) { - CheckedConsumer op = ops.get(ThreadLocalRandom.current().nextInt(ops.size())); + int opIdx = opProbMap.higherEntry(ThreadLocalRandom.current().nextDouble()).getValue(); + cntMap.compute(opIdx, (k, v) -> v == null ? 1 : (v + 1)); + CheckedConsumer, IOException> op = ops.get(opIdx); + int numberOfKeys = opIdxToNumKeyMap.getOrDefault(opIdx, 1); boolean performWithPreExistingKey = ThreadLocalRandom.current().nextBoolean(); if (performWithPreExistingKey && !keyList.isEmpty()) { - performOpWithRandomPreExistingKey(op, keyList); + performOpWithRandomPreExistingKey(op, keyList, numberOfKeys); } else { - performOpWithRandomKey(op, keySet, keyList); + performOpWithRandomKey(op, keySet, keyList, numberOfKeys); } } dbStoreWithBatch.commitBatchOperation(batchOperation); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 2741834c9d75..cd155f27a96f 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.StringUtils.bytes2String; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -66,7 +67,7 @@ public class TestRDBTableStore { public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80; private static int count = 0; private final List families = - Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), + Arrays.asList(bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY), "First", "Second", "Third", "Fourth", "Fifth", "Sixth", "Seventh", @@ -635,21 +636,21 @@ public void testPrefixedRangeKVs() throws Exception { // test start with a middle key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + "3"); + bytes2String(samplePrefix) + "3"); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix); assertEquals(2, rangeKVs.size()); // test with a filter - final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(StringUtils.bytes2String(samplePrefix) + "1"); + final KeyPrefixFilter filter1 = KeyPrefixFilter.newFilter(bytes2String(samplePrefix) + "1"); startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix)); + bytes2String(samplePrefix)); rangeKVs = testTable.getRangeKVs(startKey, blockCount, samplePrefix, filter1); assertEquals(1, rangeKVs.size()); // test start with a non-exist key startKey = StringUtils.string2Bytes( - StringUtils.bytes2String(samplePrefix) + 123); + bytes2String(samplePrefix) + 123); rangeKVs = testTable.getRangeKVs(startKey, 10, samplePrefix); assertEquals(0, rangeKVs.size()); } @@ -775,4 +776,77 @@ private void populateTable(Table table, } } } + + @Test + public void batchDeleteWithRange() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange2 = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("4-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value = + RandomStringUtils.secure().next(10).getBytes(StandardCharsets.UTF_8); + testTable.put(startKey, value); + testTable.put(keyInRange1, value); + testTable.put(keyInRange2, value); + testTable.put(endKey, value); + assertNotNull(testTable.get(startKey)); + assertNotNull(testTable.get(keyInRange1)); + assertNotNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + + //when + testTable.deleteRangeWithBatch(batch, startKey, endKey); + rdbStore.commitBatchOperation(batch); + + //then + assertNull(testTable.get(startKey)); + assertNull(testTable.get(keyInRange1)); + assertNull(testTable.get(keyInRange2)); + assertNotNull(testTable.get(endKey)); + } + } + + @Test + public void orderOfBatchOperations() throws Exception { + final Table testTable = rdbStore.getTable("Fifth"); + try (BatchOperation batch = rdbStore.initBatchOperation()) { + + //given + String keyStr = RandomStringUtils.secure().next(10); + byte[] startKey = ("1-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] keyInRange1 = ("2-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] endKey = ("3-" + keyStr).getBytes(StandardCharsets.UTF_8); + byte[] value1 = ("value1-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value2 = ("value2-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + byte[] value3 = ("value3-" + RandomStringUtils.secure().next(10)).getBytes(StandardCharsets.UTF_8); + + //when + testTable.putWithBatch(batch, startKey, value1); + testTable.putWithBatch(batch, keyInRange1, value1); + testTable.deleteWithBatch(batch, keyInRange1); + // ops map key should be <, 1> + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value2); + testTable.putWithBatch(batch, keyInRange1, value2); + // ops map key is <, 2>. + testTable.deleteRangeWithBatch(batch, startKey, keyInRange1); + testTable.putWithBatch(batch, endKey, value1); + testTable.putWithBatch(batch, endKey, value2); + // ops map key is <, 3>. + testTable.deleteRangeWithBatch(batch, startKey, endKey); + testTable.putWithBatch(batch, startKey, value3); + + rdbStore.commitBatchOperation(batch); + + //then + assertEquals(bytes2String(value3), bytes2String(testTable.get(startKey))); + assertNull(testTable.get(keyInRange1)); + assertEquals(bytes2String(value2), bytes2String(testTable.get(endKey))); + } + } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java new file mode 100644 index 000000000000..aa75d053902e --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRangeQueryIndex.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Set; +import org.apache.hadoop.hdds.utils.db.RangeQueryIndex.Range; +import org.junit.jupiter.api.Test; + +/** + * Test class for validating the behavior and functionality of the {@code RangeQueryIndex} class. + * + *

This class contains a collection of unit tests to ensure correct behavior of the range + * indexing system under various scenarios, such as intersections, overlaps, boundary conditions, + * and removal of range objects. The tests leverage the {@code Range} class for defining + * half-open intervals and test different operations provided by the {@code RangeQueryIndex}. + * + *

The tested operations include: + * - Checking for intersecting ranges. + * - Retrieving the first intersecting range. + * - Handling overlaps and nested ranges. + * - Adjacency between ranges. + * - Behaviors when handling duplicate ranges or ranges with identical bounds but different instances. + * - Error conditions when attempting invalid removals of ranges. + */ +public class TestRangeQueryIndex { + + @Test + public void testContainsIntersectingRangeHalfOpenBoundaries() { + final Range r1 = new Range<>(10, 20); // [10, 20) + final Range r2 = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Before first range + assertFalse(index.containsIntersectingRange(0)); + assertFalse(index.containsIntersectingRange(9)); + + // Start is inclusive + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + + // End is exclusive + assertFalse(index.containsIntersectingRange(20)); + assertFalse(index.containsIntersectingRange(29)); + + // Second range + assertTrue(index.containsIntersectingRange(30)); + assertTrue(index.containsIntersectingRange(39)); + assertFalse(index.containsIntersectingRange(40)); + assertFalse(index.containsIntersectingRange(100)); + } + + @Test + public void testGetFirstIntersectingRangeAndRemovalWithOverlaps() throws Exception { + // Use LinkedHashSet to make iteration order deterministic for getFirstIntersectingRange(). + final Range r2 = new Range<>(5, 15); // overlaps with r1 for [5, 10) + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r2); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertTrue(index.containsIntersectingRange(7)); + final Range first = index.getFirstIntersectingRange(7); + assertNotNull(first); + assertSame(r2, first, "should return the first containing range in set iteration order"); + + index.removeRange(r2); + assertTrue(index.containsIntersectingRange(7), "still intersecting due to remaining overlapping range"); + assertSame(r1, index.getFirstIntersectingRange(7)); + + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(7)); + assertNull(index.getFirstIntersectingRange(7)); + } + + @Test + public void testAdjacentRangesShareBoundary() { + final Range left = new Range<>(0, 10); // [0, 10) + final Range right = new Range<>(10, 20); // [10, 20) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(left); + ranges.add(right); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // End is exclusive for left; start is inclusive for right. + assertTrue(index.containsIntersectingRange(9)); + assertTrue(index.containsIntersectingRange(0)); + assertTrue(index.containsIntersectingRange(10)); + assertTrue(index.containsIntersectingRange(19)); + assertFalse(index.containsIntersectingRange(20)); + } + + @Test + public void testMultipleOverlapsAndNestedRangesRemovalOrder() throws Exception { + // rOuter covers everything; rMid overlaps partially; rInner is nested. + final Range rOuter = new Range<>(0, 100); // [0, 100) + final Range rMid = new Range<>(20, 80); // [20, 80) + final Range rInner = new Range<>(30, 40); // [30, 40) + final Set> ranges = new LinkedHashSet<>(); + ranges.add(rOuter); + ranges.add(rMid); + ranges.add(rInner); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Covered by outer only + assertTrue(index.containsIntersectingRange(10)); + assertSame(rOuter, index.getFirstIntersectingRange(10)); + + // Covered by all three + assertTrue(index.containsIntersectingRange(35)); + assertSame(rOuter, index.getFirstIntersectingRange(35)); + + // Remove the middle range first. + index.removeRange(rMid); + assertTrue(index.containsIntersectingRange(35), "still covered by outer + inner"); + + // Remove the inner range next. + index.removeRange(rInner); + assertTrue(index.containsIntersectingRange(35), "still covered by outer"); + + // Now remove the outer range; should become uncovered. + index.removeRange(rOuter); + assertFalse(index.containsIntersectingRange(35)); + assertNull(index.getFirstIntersectingRange(35)); + } + + @Test + public void testDuplicateSameBoundsDifferentInstances() throws Exception { + // Range.equals is identity-based, so two ranges with the same bounds can co-exist in the Set. + final Range r1 = new Range<>(0, 10); + final Range r2 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + ranges.add(r2); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + assertTrue(index.containsIntersectingRange(5)); + + // Remove one instance: should still intersect due to the other. + index.removeRange(r1); + assertTrue(index.containsIntersectingRange(5)); + + // Remove the second instance: now it should not intersect. + index.removeRange(r2); + assertFalse(index.containsIntersectingRange(5)); + } + + @Test + public void testRemoveSameInstanceTwiceThrows() throws Exception { + final Range r = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + index.removeRange(r); + assertThrows(IOException.class, () -> index.removeRange(r)); + } + + @Test + public void testRemoveRangeNotFoundThrows() throws Exception { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + // Range.equals is identity-based, so a different object with same bounds is not "found". + final Range sameBoundsDifferentInstance = new Range<>(0, 10); + assertThrows(IOException.class, () -> index.removeRange(sameBoundsDifferentInstance)); + + // Removing the original instance works. + index.removeRange(r1); + assertFalse(index.containsIntersectingRange(0)); + } + + @Test + public void testRemoveRangeDifferentBoundsThrows() { + final Range r1 = new Range<>(0, 10); + final Set> ranges = new LinkedHashSet<>(); + ranges.add(r1); + final RangeQueryIndex index = new RangeQueryIndex<>(ranges); + + assertThrows(IOException.class, () -> index.removeRange(new Range<>(1, 2))); + assertTrue(index.containsIntersectingRange(1), "index should remain unchanged after failed remove"); + } +} + + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index b488997b5228..77ed8a824874 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -92,7 +92,6 @@ import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; import org.apache.hadoop.hdds.utils.db.RocksDatabase; import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; @@ -584,11 +583,7 @@ private static void deleteKeysFromTableWithBucketPrefix(OMMetadataManager metada String endKey = getLexicographicallyHigherString(prefix); LOG.debug("Deleting key range from {} - startKey: {}, endKey: {}", table.getName(), prefix, endKey); - try (TableIterator itr = table.keyIterator(prefix)) { - while (itr.hasNext()) { - table.deleteWithBatch(batchOperation, itr.next()); - } - } + table.deleteRangeWithBatch(batchOperation, prefix, endKey); } @VisibleForTesting diff --git a/pom.xml b/pom.xml index 44d822cfb383..223a11ffb4fc 100644 --- a/pom.xml +++ b/pom.xml @@ -198,7 +198,7 @@ 3.25.8 2.7.0 - 1.75.0 + 1.77.0 4.1.127.Final 3.25.8 1.0.10