Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException
table.deleteWithBatch(batch, key);
}

@Override
public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException {
table.deleteRangeWithBatch(batch, beginKey, endKey);
}

@Override
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, IteratorType type) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDirectSlice;
Expand Down Expand Up @@ -84,7 +90,7 @@ private static String countSize2String(int count, long size) {
* To implement {@link #equals(Object)} and {@link #hashCode()}
* based on the contents of the bytes.
*/
static final class Bytes implements Closeable {
static final class Bytes implements Comparable<Bytes>, Closeable {
private final AbstractSlice<?> slice;
/** Cache the hash value. */
private final int hash;
Expand Down Expand Up @@ -130,6 +136,12 @@ public String toString() {
return slice.toString();
}

// This method mimics the ByteWiseComparator in RocksDB.
@Override
public int compareTo(RDBBatchOperation.Bytes that) {
return this.slice.compare(that.slice);
}

@Override
public void close() {
slice.close();
Expand Down Expand Up @@ -239,6 +251,40 @@ boolean closeImpl() {
}
}

/**
* Delete range operation to be applied to a {@link ColumnFamily} batch.
*/
private final class DeleteRangeOperation extends Op {
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeleteRangeOperation should be made static, since the enclosing instance is not used.

Suggested change
private final class DeleteRangeOperation extends Op {
private static final class DeleteRangeOperation extends Op {

Copilot uses AI. Check for mistakes.
private final byte[] startKey;
private final byte[] endKey;
private final RangeQueryIndex.Range<Bytes> rangeEntry;

private DeleteRangeOperation(byte[] startKey, byte[] endKey) {
this.startKey = Objects.requireNonNull(startKey, "startKey == null");
this.endKey = Objects.requireNonNull(endKey, "endKey == null");
this.rangeEntry = new RangeQueryIndex.Range<>(new Bytes(startKey), new Bytes(endKey));
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDeleteRange(batch, startKey, endKey);
}

@Override
int totalLength() {
return startKey.length + endKey.length;
}

@Override
boolean closeImpl() {
if (super.closeImpl()) {
IOUtils.close(LOG, rangeEntry.getStartInclusive(), rangeEntry.getEndExclusive());
return true;
}
return false;
}
}

/** Cache and deduplicate db ops (put/delete). */
private class OpCache {
/** A (family name -> {@link FamilyCache}) map. */
Expand All @@ -249,36 +295,123 @@ private class FamilyCache {
private final ColumnFamily family;

/**
* A mapping of keys to operations for batch processing in the {@link FamilyCache}.
* The keys are represented as {@link Bytes} objects, encapsulating the byte array or buffer
* for efficient equality and hashing. The values are instances of {@link Op}, representing
* different types of operations that can be applied to a {@link ColumnFamily}.
* A mapping of operation keys to their respective indices in {@code FamilyCache}.
*
* Key details:
* - Maintains a mapping of unique operation keys to their insertion or processing order.
* - Used internally to manage and sort operations during batch writes.
* - Facilitates filtering, overwriting, or deletion of operations based on their keys.
*
* Constraints:
* - Keys must be unique, represented using {@link Bytes}, to avoid collisions.
* - Each key is associated with a unique integer index to track insertion order.
*
* This field is intended to store pending batch updates before they are written to the database.
* It supports operations such as additions and deletions while maintaining the ability to overwrite
* existing entries when necessary.
* This field plays a critical role in managing the logical consistency and proper execution
* order of operations stored in the batch when interacting with a RocksDB-backed system.
*/
private final Map<Bytes, SingleKeyOp> ops = new HashMap<>();
private final Map<Bytes, Integer> singleOpKeys = new HashMap<>();
/**
* Maintains a mapping of unique operation indices to their corresponding {@code Operation} instances.
*
* This map serves as the primary container for recording operations in preparation for a batch write
* within a RocksDB-backed system. Each operation is referenced by an integer index, which determines
* its insertion order and ensures correct sequencing during batch execution.
*
* Key characteristics:
* - Stores operations of type {@code Operation}.
* - Uses a unique integer key (index) for mapping each operation.
* - Serves as an intermediary structure during batch preparation and execution.
*
* Usage context:
* - This map is managed as part of the batch-writing process, which involves organizing,
* filtering, and applying multiple operations in a single cohesive batch.
* - Operations stored in this map are expected to define specific actions (e.g., put, delete,
* delete range) and their associated data (e.g., keys, values).
*/
private final Map<Integer, Op> ops = new HashMap<>();
private boolean isCommit;

private long batchSize;
private long discardedSize;
private int discardedCount;
private int putCount;
private int delCount;
private int delRangeCount;
private AtomicInteger opIndex;

FamilyCache(ColumnFamily family) {
this.family = family;
this.opIndex = new AtomicInteger(0);
}

/** Prepare batch write for the entire family. */
/**
* Prepares a batch write operation for a RocksDB-backed system.
*
* This method ensures the orderly execution of operations accumulated in the batch,
* respecting their respective types and order of insertion.
*
* Key functionalities:
* 1. Ensures that the batch is not already committed before proceeding.
* 2. Sorts all operations by their `opIndex` to maintain a consistent execution order.
* 3. Filters and adapts operations to account for any delete range operations that might
* affect other operations in the batch:
* - Operations with keys that fall within the range specified by a delete range operation
* are discarded.
* - Delete range operations are executed in their correct order.
* 4. Applies remaining operations to the write batch, ensuring proper filtering and execution.
* 5. Logs a summary of the batch execution for debugging purposes.
*
* Throws:
* - RocksDatabaseException if any error occurs while applying operations to the write batch.
*
* Prerequisites:
* - The method assumes that the operations are represented by `Operation` objects, each of which
* encapsulates the logic for its specific type.
* - Delete range operations must be represented by the `DeleteRangeOperation` class.
*/
void prepareBatchWrite() throws RocksDatabaseException {
Preconditions.checkState(!isCommit, "%s is already committed.", this);
isCommit = true;
for (Op op : ops.values()) {
op.apply(family, writeBatch);
// Sort Entries based on opIndex and flush the operation to the batch in the same order.
List<Op> opList = ops.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey))
.map(Map.Entry::getValue).collect(Collectors.toList());
Set<RangeQueryIndex.Range<Bytes>> deleteRangeEntries = new HashSet<>();
for (Op op : opList) {
if (op instanceof DeleteRangeOperation) {
DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op;
deleteRangeEntries.add(deleteRangeOp.rangeEntry);
}
}
try {
RangeQueryIndex<Bytes> rangeQueryIdx = new RangeQueryIndex<>(deleteRangeEntries);
for (Op op : opList) {
if (op instanceof DeleteRangeOperation) {
DeleteRangeOperation deleteRangeOp = (DeleteRangeOperation) op;
rangeQueryIdx.removeRange(deleteRangeOp.rangeEntry);
op.apply(family, writeBatch);
} else {
// Find a delete range op matching which would contain the key after the
// operation has occurred. If there is no such operation then perform the operation otherwise discard the
// op.
SingleKeyOp singleKeyOp = (SingleKeyOp) op;
if (!rangeQueryIdx.containsIntersectingRange(singleKeyOp.getKeyBytes())) {
op.apply(family, writeBatch);
} else {
debug(() -> {
RangeQueryIndex.Range<Bytes> deleteRangeOp =
rangeQueryIdx.getFirstIntersectingRange(singleKeyOp.getKeyBytes());
return String.format("Discarding Operation with Key: %s as it falls within the range of [%s, %s)",
singleKeyOp.getKeyBytes(), deleteRangeOp.getStartInclusive(), deleteRangeOp.getEndExclusive());
});
discardedCount++;
discardedSize += op.totalLength();
}
}
}
debug(this::summary);
} catch (IOException e) {
throw new RocksDatabaseException("Failed to prepare batch write", e);
}
debug(this::summary);
}

private String summary() {
Expand All @@ -299,8 +432,10 @@ void clear() {
}

private void deleteIfExist(Bytes key) {
final SingleKeyOp previous = ops.remove(key);
if (previous != null) {
// remove previous first in order to call release()
Integer previousIndex = singleOpKeys.remove(key);
if (previousIndex != null) {
final SingleKeyOp previous = (SingleKeyOp) ops.remove(previousIndex);
previous.close();
discardedSize += previous.totalLength();
discardedCount++;
Expand All @@ -314,9 +449,10 @@ void overwriteIfExists(SingleKeyOp op) {
Bytes key = op.getKeyBytes();
deleteIfExist(key);
batchSize += op.totalLength();
Op overwritten = ops.put(key, op);
Preconditions.checkState(overwritten == null);

int newIndex = opIndex.getAndIncrement();
final Integer overwrittenOpKey = singleOpKeys.put(key, newIndex);
final Op overwrittenOp = ops.put(newIndex, op);
Preconditions.checkState(overwrittenOpKey == null && overwrittenOp == null);
debug(() -> String.format("%s %s, %s; key=%s", this,
op instanceof DeleteOp ? delString(op.totalLength()) : putString(op.keyLen(), op.valLen()),
batchSizeDiscardedString(), key));
Expand All @@ -332,6 +468,11 @@ void delete(CodecBuffer key) {
overwriteIfExists(new DeleteOp(key));
}

void deleteRange(byte[] startKey, byte[] endKey) {
delRangeCount++;
ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey));
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deleteRange method doesn't update the batchSize field when adding a DeleteRangeOperation, unlike the put and delete methods which call overwriteIfExists and update batchSize accordingly. This inconsistency means that batch size tracking will be incomplete, potentially affecting batch operation monitoring and logging. Consider adding "batchSize += new DeleteRangeOperation(startKey, endKey).totalLength();" or restructuring to ensure batch size is properly tracked for delete range operations.

Suggested change
ops.put(opIndex.getAndIncrement(), new DeleteRangeOperation(startKey, endKey));
DeleteRangeOperation op = new DeleteRangeOperation(startKey, endKey);
batchSize += op.totalLength();
ops.put(opIndex.getAndIncrement(), op);

Copilot uses AI. Check for mistakes.
}

String putString(int keySize, int valueSize) {
return String.format("put(key: %s, value: %s), #put=%s",
byteSize2String(keySize), byteSize2String(valueSize), putCount);
Expand Down Expand Up @@ -361,6 +502,11 @@ void delete(ColumnFamily family, CodecBuffer key) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
}

void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family))
.deleteRange(startKey, endKey);
}

/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Expand All @@ -382,19 +528,21 @@ String getCommitString() {
int opSize = 0;
int discardedCount = 0;
int discardedSize = 0;
int delRangeCount = 0;

for (FamilyCache f : name2cache.values()) {
putCount += f.putCount;
delCount += f.delCount;
opSize += f.batchSize;
discardedCount += f.discardedCount;
discardedSize += f.discardedSize;
delRangeCount += f.delRangeCount;
}

final int opCount = putCount + delCount;
return String.format(
"#put=%s, #del=%s, batchSize: %s, discarded: %s, committed: %s",
putCount, delCount,
"#put=%s, #del=%s, #delRange=%s, batchSize: %s, discarded: %s, committed: %s",
putCount, delCount, delRangeCount,
countSize2String(opCount, opSize),
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
Expand Down Expand Up @@ -449,4 +597,8 @@ public void put(ColumnFamily family, byte[] key, byte[] value) {
opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key),
DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value));
}

public void deleteRange(ColumnFamily family, byte[] startKey, byte[] endKey) {
opCache.deleteRange(family, startKey, endKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) {

}

@Override
public void deleteRangeWithBatch(BatchOperation batch, byte[] beginKey, byte[] endKey) {
if (batch instanceof RDBBatchOperation) {
((RDBBatchOperation) batch).deleteRange(family, beginKey, endKey);
} else {
throw new IllegalArgumentException("batch should be RDBBatchOperation");
}
}

@Override
public KeyValueIterator<byte[], byte[]> iterator(byte[] prefix, IteratorType type)
throws RocksDatabaseException {
Expand Down
Loading
Loading