Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
Expand Down Expand Up @@ -202,40 +203,42 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}
}

private void startScan(OpenScannerResponse resp) {
addListener(
// lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan metrics
private void startScan(OpenScannerResponse resp, long lastNextCallNanos) {
AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller =
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.lastNextCallNanos(lastNextCallNanos)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
(hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
.setRequestAttributes(requestAttributes).build();
addListener(scanSingleRegionCaller.start(resp.controller, resp.resp), (hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
if (hasMore) {
openScanner(scanSingleRegionCaller);
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
});
}
});
}

private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
Expand All @@ -256,11 +259,17 @@ private long getPrimaryTimeoutNs() {
: conn.connConf.getPrimaryScanTimeoutNs();
}

private void openScanner() {
private void openScanner(AsyncScanSingleRegionRpcRetryingCaller previousScanSingleRegionCaller) {
if (this.isScanMetricsByRegionEnabled) {
scanMetrics.moveToNextRegion();
}
incRegionCountMetrics(scanMetrics);
long openScannerStartNanos = System.nanoTime();
if (previousScanSingleRegionCaller != null) {
// open scanner is also a next call
incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS
.toMillis(openScannerStartNanos - previousScanSingleRegionCaller.getLastNextCallNanos()));
}
openScannerTries.set(1);
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
Expand All @@ -280,14 +289,14 @@ private void openScanner() {
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
loc.getServerName());
}
startScan(resp);
startScan(resp, openScannerStartNanos);
}
});
}

public void start() {
try (Scope ignored = span.makeCurrent()) {
openScanner();
openScanner(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase {

private long rpcTimeoutNs;

private long lastNextCallNanos = System.nanoTime();

private int priority = PRIORITY_UNSET;

private Map<String, byte[]> requestAttributes = Collections.emptyMap();
Expand Down Expand Up @@ -275,6 +277,11 @@ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
return this;
}

public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) {
this.lastNextCallNanos = nanos;
return this;
}

public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(pause);
return this;
Expand Down Expand Up @@ -311,7 +318,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() {
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority,
scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts,
scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes);
scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt, requestAttributes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
Expand Down Expand Up @@ -120,7 +121,9 @@ class AsyncScanSingleRegionRpcRetryingCaller {

private boolean includeNextStartRowWhenError;

private long nextCallStartNs;
private long lastNextCallNanos;

private long nextCallStartNanos;

private int tries;

Expand Down Expand Up @@ -334,7 +337,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
long lastNextCallNanos, int startLogErrorsCnt, Map<String, byte[]> requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.scan = scan;
Expand All @@ -349,6 +352,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.lastNextCallNanos = lastNextCallNanos;
this.startLogErrorsCnt = startLogErrorsCnt;
if (scan.isReversed()) {
completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
Expand All @@ -365,8 +369,12 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
}

public long getLastNextCallNanos() {
return lastNextCallNanos;
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNanos);
}

private void closeScanner() {
Expand Down Expand Up @@ -433,7 +441,7 @@ private void onError(Throwable error) {
}

OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally(!scannerClosed);
return;
Expand Down Expand Up @@ -582,7 +590,7 @@ private void call() {
// new one.
long callTimeoutNs;
if (scanTimeoutNs > 0) {
long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNanos);
if (remainingNs <= 0) {
completeExceptionally(true);
return;
Expand Down Expand Up @@ -610,7 +618,10 @@ private void next() {
nextCallSeq++;
tries = 1;
exceptions.clear();
nextCallStartNs = System.nanoTime();
nextCallStartNanos = System.nanoTime();
incMillisBetweenNextsMetrics(scanMetrics,
TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos));
lastNextCallNanos = nextCallStartNanos;
call();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ static void incRegionCountMetrics(ScanMetrics scanMetrics) {
scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}

static void incMillisBetweenNextsMetrics(ScanMetrics scanMetrics, long millis) {
if (scanMetrics == null) {
return;
}
scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME, millis);
}

/**
* Connect the two futures, if the src future is done, then mark the dst future as done. And if
* the dst future is done, then cancel the src future. This is used for timeline consistent read.
Expand Down
Loading