-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-14245. Optimize search for deleted range using prefix count index structure #9690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,13 +21,19 @@ | |||||||||
|
|
||||||||||
| import com.google.common.base.Preconditions; | ||||||||||
| import java.io.Closeable; | ||||||||||
| import java.io.IOException; | ||||||||||
| import java.nio.ByteBuffer; | ||||||||||
| import java.util.Comparator; | ||||||||||
| import java.util.HashMap; | ||||||||||
| import java.util.HashSet; | ||||||||||
| import java.util.List; | ||||||||||
| import java.util.Map; | ||||||||||
| import java.util.Objects; | ||||||||||
| import java.util.Set; | ||||||||||
| import java.util.concurrent.atomic.AtomicBoolean; | ||||||||||
| import java.util.concurrent.atomic.AtomicInteger; | ||||||||||
| import java.util.function.Supplier; | ||||||||||
| import java.util.stream.Collectors; | ||||||||||
| import org.apache.hadoop.hdds.utils.IOUtils; | ||||||||||
| import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; | ||||||||||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice; | ||||||||||
|
|
@@ -84,7 +90,7 @@ private static String countSize2String(int count, long size) { | |||||||||
| * To implement {@link #equals(Object)} and {@link #hashCode()} | ||||||||||
| * based on the contents of the bytes. | ||||||||||
| */ | ||||||||||
| static final class Bytes implements Closeable { | ||||||||||
| static final class Bytes implements Comparable<Bytes>, Closeable { | ||||||||||
| private final AbstractSlice<?> slice; | ||||||||||
| /** Cache the hash value. */ | ||||||||||
| private final int hash; | ||||||||||
|
|
@@ -130,6 +136,12 @@ public String toString() { | |||||||||
| return slice.toString(); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // This method mimics the ByteWiseComparator in RocksDB. | ||||||||||
| @Override | ||||||||||
| public int compareTo(RDBBatchOperation.Bytes that) { | ||||||||||
| return this.slice.compare(that.slice); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public void close() { | ||||||||||
| slice.close(); | ||||||||||
|
|
@@ -239,6 +251,40 @@ boolean closeImpl() { | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Delete range operation to be applied to a {@link ColumnFamily} batch. | ||||||||||
| */ | ||||||||||
| private final class DeleteRangeOperation extends Op { | ||||||||||
| private final byte[] startKey; | ||||||||||
| private final byte[] endKey; | ||||||||||
| private final RangeQueryIndex.Range<Bytes> rangeEntry; | ||||||||||
|
|
||||||||||
| private DeleteRangeOperation(byte[] startKey, byte[] endKey) { | ||||||||||
| this.startKey = Objects.requireNonNull(startKey, "startKey == null"); | ||||||||||
| this.endKey = Objects.requireNonNull(endKey, "endKey == null"); | ||||||||||
| this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||||||||||
| family.batchDeleteRange(batch, startKey, endKey); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| int totalLength() { | ||||||||||
| return startKey.length + endKey.length; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| @Override | ||||||||||
| boolean closeImpl() { | ||||||||||
| if (super.closeImpl()) { | ||||||||||
| IOUtils.close(LOG, rangeEntry.getStartInclusive(), rangeEntry.getEndExclusive()); | ||||||||||
| return true; | ||||||||||
| } | ||||||||||
| return false; | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** Cache and deduplicate db ops (put/delete). */ | ||||||||||
| private class OpCache { | ||||||||||
| /** A (family name -> {@link FamilyCache}) map. */ | ||||||||||
|
|
@@ -249,36 +295,123 @@ private class FamilyCache { | |||||||||
| private final ColumnFamily family; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * A mapping of keys to operations for batch processing in the {@link FamilyCache}. | ||||||||||
| * The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer | ||||||||||
| * for efficient equality and hashing. The values are instances of {@link Op}, representing | ||||||||||
| * different types of operations that can be applied to a {@link ColumnFamily}. | ||||||||||
| * A mapping of operation keys to their respective indices in {@code FamilyCache}. | ||||||||||
| * | ||||||||||
| * Key details: | ||||||||||
| * - Maintains a mapping of unique operation keys to their insertion or processing order. | ||||||||||
| * - Used internally to manage and sort operations during batch writes. | ||||||||||
| * - Facilitates filtering, overwriting, or deletion of operations based on their keys. | ||||||||||
| * | ||||||||||
| * Constraints: | ||||||||||
| * - Keys must be unique, represented using {@link Bytes}, to avoid collisions. | ||||||||||
| * - Each key is associated with a unique integer index to track insertion order. | ||||||||||
| * | ||||||||||
| * This field is intended to store pending batch updates before they are written to the database. | ||||||||||
| * It supports operations such as additions and deletions while maintaining the ability to overwrite | ||||||||||
| * existing entries when necessary. | ||||||||||
| * This field plays a critical role in managing the logical consistency and proper execution | ||||||||||
| * order of operations stored in the batch when interacting with a RocksDB-backed system. | ||||||||||
| */ | ||||||||||
| private final Map<Bytes, SingleKeyOp> ops = new HashMap<>(); | ||||||||||
| private final Map<Bytes, Integer> singleOpKeys = new HashMap<>(); | ||||||||||
| /** | ||||||||||
| * Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances. | ||||||||||
| * | ||||||||||
| * This map serves as the primary container for recording operations in preparation for a batch write | ||||||||||
| * within a RocksDB-backed system. Each operation is referenced by an integer index, which determines | ||||||||||
| * its insertion order and ensures correct sequencing during batch execution. | ||||||||||
| * | ||||||||||
| * Key characteristics: | ||||||||||
| * - Stores operations of type {@code Operation}. | ||||||||||
| * - Uses a unique integer key (index) for mapping each operation. | ||||||||||
| * - Serves as an intermediary structure during batch preparation and execution. | ||||||||||
| * | ||||||||||
| * Usage context: | ||||||||||
| * - This map is managed as part of the batch-writing process, which involves organizing, | ||||||||||
| * filtering, and applying multiple operations in a single cohesive batch. | ||||||||||
| * - Operations stored in this map are expected to define specific actions (e.g., put, delete, | ||||||||||
| * delete range) and their associated data (e.g., keys, values). | ||||||||||
| */ | ||||||||||
| private final Map<Integer, Op> ops = new HashMap<>(); | ||||||||||
| private boolean isCommit; | ||||||||||
|
|
||||||||||
| private long batchSize; | ||||||||||
| private long discardedSize; | ||||||||||
| private int discardedCount; | ||||||||||
| private int putCount; | ||||||||||
| private int delCount; | ||||||||||
| private int delRangeCount; | ||||||||||
| private AtomicInteger opIndex; | ||||||||||
|
|
||||||||||
| FamilyCache(ColumnFamily family) { | ||||||||||
| this.family = family; | ||||||||||
| this.opIndex = new AtomicInteger(0); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| /** Prepare batch write for the entire family. */ | ||||||||||
| /** | ||||||||||
| * Prepares a batch write operation for a RocksDB-backed system. | ||||||||||
| * | ||||||||||
| * This method ensures the orderly execution of operations accumulated in the batch, | ||||||||||
| * respecting their respective types and order of insertion. | ||||||||||
| * | ||||||||||
| * Key functionalities: | ||||||||||
| * 1. Ensures that the batch is not already committed before proceeding. | ||||||||||
| * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. | ||||||||||
| * 3. Filters and adapts operations to account for any delete range operations that might | ||||||||||
| * affect other operations in the batch: | ||||||||||
| * - Operations with keys that fall within the range specified by a delete range operation | ||||||||||
| * are discarded. | ||||||||||
| * - Delete range operations are executed in their correct order. | ||||||||||
| * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. | ||||||||||
| * 5. Logs a summary of the batch execution for debugging purposes. | ||||||||||
| * | ||||||||||
| * Throws: | ||||||||||
| * - RocksDatabaseException if any error occurs while applying operations to the write batch. | ||||||||||
| * | ||||||||||
| * Prerequisites: | ||||||||||
| * - The method assumes that the operations are represented by `Operation` objects, each of which | ||||||||||
| * encapsulates the logic for its specific type. | ||||||||||
| * - Delete range operations must be represented by the `DeleteRangeOperation` class. | ||||||||||
| */ | ||||||||||
| void prepareBatchWrite() throws RocksDatabaseException { | ||||||||||
| Preconditions.checkState(!isCommit, "%s is already committed.", this); | ||||||||||
| isCommit = true; | ||||||||||
| for (Op op : ops.values()) { | ||||||||||
| op.apply(family, writeBatch); | ||||||||||
| // Sort Entries based on opIndex and flush the operation to the batch in the same order. | ||||||||||
| List<Op> opList = ops.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) | ||||||||||
| .map(Map.Entry::getValue).collect(Collectors.toList()); | ||||||||||
| Set<RangeQueryIndex.Range<Bytes>> deleteRangeEntries = new HashSet<>(); | ||||||||||
| for (Op op : opList) { | ||||||||||
| if (op instanceof DeleteRangeOperation) { | ||||||||||
| DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; | ||||||||||
| deleteRangeEntries.add(deleteRangeOp.rangeEntry); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| try { | ||||||||||
| RangeQueryIndex<Bytes> rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries); | ||||||||||
| for (Op op : opList) { | ||||||||||
| if (op instanceof DeleteRangeOperation) { | ||||||||||
| DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op; | ||||||||||
| rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry); | ||||||||||
| op.apply(family, writeBatch); | ||||||||||
| } else { | ||||||||||
| // Find a delete range op matching which would contain the key after the | ||||||||||
| // operation has occurred. If there is no such operation then perform the operation otherwise discard the | ||||||||||
| // op. | ||||||||||
| SingleKeyOp singleKeyOp = (SingleKeyOp) op; | ||||||||||
| if (!rangeQueryIdx.containsIntersectingRange(singleKeyOp.getKeyBytes())) { | ||||||||||
| op.apply(family, writeBatch); | ||||||||||
| } else { | ||||||||||
| debug(() -> { | ||||||||||
| RangeQueryIndex.Range<Bytes> deleteRangeOp = | ||||||||||
| rangeQueryIdx.getFirstIntersectingRange(singleKeyOp.getKeyBytes()); | ||||||||||
| return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)", | ||||||||||
| singleKeyOp.getKeyBytes(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive()); | ||||||||||
| }); | ||||||||||
| discardedCount++; | ||||||||||
| discardedSize += op.totalLength(); | ||||||||||
| } | ||||||||||
| } | ||||||||||
| } | ||||||||||
| debug(this::summary); | ||||||||||
| } catch (IOException e) { | ||||||||||
| throw new RocksDatabaseException("Failed to prepare batch write", e); | ||||||||||
| } | ||||||||||
| debug(this::summary); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private String summary() { | ||||||||||
|
|
@@ -299,8 +432,10 @@ void clear() { | |||||||||
| } | ||||||||||
|
|
||||||||||
| private void deleteIfExist(Bytes key) { | ||||||||||
| final SingleKeyOp previous = ops.remove(key); | ||||||||||
| if (previous != null) { | ||||||||||
| // remove previous first in order to call release() | ||||||||||
| Integer previousIndex = singleOpKeys.remove(key); | ||||||||||
| if (previousIndex != null) { | ||||||||||
| final SingleKeyOp previous = (SingleKeyOp) ops.remove(previousIndex); | ||||||||||
| previous.close(); | ||||||||||
| discardedSize += previous.totalLength(); | ||||||||||
| discardedCount++; | ||||||||||
|
|
@@ -314,9 +449,10 @@ void overwriteIfExists(SingleKeyOp op) { | |||||||||
| Bytes key = op.getKeyBytes(); | ||||||||||
| deleteIfExist(key); | ||||||||||
| batchSize += op.totalLength(); | ||||||||||
| Op overwritten = ops.put(key, op); | ||||||||||
| Preconditions.checkState(overwritten == null); | ||||||||||
|
|
||||||||||
| int newIndex = opIndex.getAndIncrement(); | ||||||||||
| final Integer overwrittenOpKey = singleOpKeys.put(key, newIndex); | ||||||||||
| final Op overwrittenOp = ops.put(newIndex, op); | ||||||||||
| Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null); | ||||||||||
| debug(() -> String.format("%s %s, %s; key=%s", this, | ||||||||||
| op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()), | ||||||||||
| batchSizeDiscardedString(), key)); | ||||||||||
|
|
@@ -332,6 +468,11 @@ void delete(CodecBuffer key) { | |||||||||
| overwriteIfExists(new DeleteOp(key)); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| void deleteRange(byte[] startKey, byte[] endKey) { | ||||||||||
| delRangeCount++; | ||||||||||
| ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); | ||||||||||
|
||||||||||
| ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey)); | |
| DeleteRangeOperation op = new DeleteRangeOperation(startKey, endKey); | |
| batchSize += op.totalLength(); | |
| ops.put(opIndex.getAndIncrement(), op); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeleteRangeOperation should be made static, since the enclosing instance is not used.