diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d58c8e60c8d8..8d3bcb000bc3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; @@ -202,40 +203,42 @@ private CompletableFuture callOpenScanner(HBaseRpcControlle } } - private void startScan(OpenScannerResponse resp) { - addListener( + // lastNextCallNanos is used to calculate the MILLIS_BETWEEN_NEXTS scan metrics + private void startScan(OpenScannerResponse resp, long lastNextCallNanos) { + AsyncScanSingleRegionRpcRetryingCaller scanSingleRegionCaller = conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) .remote(resp.isRegionServerRemote) .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .lastNextCallNanos(lastNextCallNanos) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .setRequestAttributes(requestAttributes).start(resp.controller, resp.resp), - (hasMore, error) -> { - try (Scope ignored = span.makeCurrent()) { - if (error != null) { - try { - consumer.onError(error); - return; - } finally { - TraceUtil.setError(span, error); - span.end(); - } + .setRequestAttributes(requestAttributes).build(); + addListener(scanSingleRegionCaller.start(resp.controller, resp.resp), (hasMore, error) -> { + try (Scope ignored = span.makeCurrent()) { + if (error != null) { + try { + consumer.onError(error); + return; + } finally { + TraceUtil.setError(span, error); + span.end(); } - if (hasMore) { - openScanner(); - } else { - try { - consumer.onComplete(); - } finally { - span.setStatus(StatusCode.OK); - span.end(); - } + } + if (hasMore) { + openScanner(scanSingleRegionCaller); + } else { + try { + consumer.onComplete(); + } finally { + span.setStatus(StatusCode.OK); + span.end(); } } - }); + } + }); } private CompletableFuture openScanner(int replicaId) { @@ -256,11 +259,17 @@ private long getPrimaryTimeoutNs() { : conn.connConf.getPrimaryScanTimeoutNs(); } - private void openScanner() { + private void openScanner(AsyncScanSingleRegionRpcRetryingCaller previousScanSingleRegionCaller) { if (this.isScanMetricsByRegionEnabled) { scanMetrics.moveToNextRegion(); } incRegionCountMetrics(scanMetrics); + long openScannerStartNanos = System.nanoTime(); + if (previousScanSingleRegionCaller != null) { + // open scanner is also a next call + incMillisBetweenNextsMetrics(scanMetrics, TimeUnit.NANOSECONDS + .toMillis(openScannerStartNanos - previousScanSingleRegionCaller.getLastNextCallNanos())); + } openScannerTries.set(1); addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, @@ -280,14 +289,14 @@ private void openScanner() { this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(), loc.getServerName()); } - startScan(resp); + startScan(resp, openScannerStartNanos); } }); } public void start() { try (Scope ignored = span.makeCurrent()) { - openScanner(); + openScanner(null); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 1ea2a1ad7dd4..50d9d36a2b58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -209,6 +209,8 @@ public class ScanSingleRegionCallerBuilder extends BuilderBase { private long rpcTimeoutNs; + private long lastNextCallNanos = System.nanoTime(); + private int priority = PRIORITY_UNSET; private Map requestAttributes = Collections.emptyMap(); @@ -275,6 +277,11 @@ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { return this; } + public ScanSingleRegionCallerBuilder lastNextCallNanos(long nanos) { + this.lastNextCallNanos = nanos; + return this; + } + public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; @@ -311,7 +318,7 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, - scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, requestAttributes); + scanTimeoutNs, rpcTimeoutNs, lastNextCallNanos, startLogErrorsCnt, requestAttributes); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 51a9a07c9a2c..a9c7092921c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incMillisBetweenNextsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; @@ -120,7 +121,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { private boolean includeNextStartRowWhenError; - private long nextCallStartNs; + private long lastNextCallNanos; + + private long nextCallStartNanos; private int tries; @@ -334,7 +337,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt, Map requestAttributes) { + long lastNextCallNanos, int startLogErrorsCnt, Map requestAttributes) { this.retryTimer = retryTimer; this.conn = conn; this.scan = scan; @@ -349,6 +352,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; + this.lastNextCallNanos = lastNextCallNanos; this.startLogErrorsCnt = startLogErrorsCnt; if (scan.isReversed()) { completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; @@ -365,8 +369,12 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); } + public long getLastNextCallNanos() { + return lastNextCallNanos; + } + private long elapsedMs() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNanos); } private void closeScanner() { @@ -433,7 +441,7 @@ private void onError(Throwable error) { } OptionalLong maybePauseNsToUse = - pauseManager.getPauseNsFromException(error, tries, nextCallStartNs); + pauseManager.getPauseNsFromException(error, tries, nextCallStartNanos); if (!maybePauseNsToUse.isPresent()) { completeExceptionally(!scannerClosed); return; @@ -582,7 +590,7 @@ private void call() { // new one. long callTimeoutNs; if (scanTimeoutNs > 0) { - long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNanos); if (remainingNs <= 0) { completeExceptionally(true); return; @@ -610,7 +618,10 @@ private void next() { nextCallSeq++; tries = 1; exceptions.clear(); - nextCallStartNs = System.nanoTime(); + nextCallStartNanos = System.nanoTime(); + incMillisBetweenNextsMetrics(scanMetrics, + TimeUnit.NANOSECONDS.toMillis(nextCallStartNanos - lastNextCallNanos)); + lastNextCallNanos = nextCallStartNanos; call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 40e205ddca86..fd741a2658b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -393,6 +393,13 @@ static void incRegionCountMetrics(ScanMetrics scanMetrics) { scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1); } + static void incMillisBetweenNextsMetrics(ScanMetrics scanMetrics, long millis) { + if (scanMetrics == null) { + return; + } + scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME, millis); + } + /** * Connect the two futures, if the src future is done, then mark the dst future as done. And if * the dst future is done, then cancel the src future. This is used for timeline consistent read. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java index 6678e94b7e0c..84cc7cc83363 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME; -import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -33,7 +37,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ForkJoinPool; -import org.apache.hadoop.hbase.HBaseClassTestRule; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; @@ -43,26 +49,19 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.params.provider.Arguments; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; -@RunWith(Parameterized.class) -@Category({ MediumTests.class, ClientTests.class }) +@Tag(MediumTests.TAG) +@Tag(ClientTests.TAG) +@HBaseParameterizedTestTemplate(name = "{index}: scan={0}") public class TestAsyncTableScanMetrics { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class); - private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics"); @@ -82,24 +81,24 @@ private interface ScanWithMetrics { Pair, ScanMetrics> scan(Scan scan) throws Exception; } - @Parameter(0) - public String methodName; + private ScanWithMetrics method; - @Parameter(1) - public ScanWithMetrics method; + // methodName is just for naming + public TestAsyncTableScanMetrics(String methodName, ScanWithMetrics method) { + this.method = method; + } - @Parameters(name = "{index}: scan={0}") - public static List params() { + public static Stream parameters() { ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable; ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan; ScanWithMetrics doScanWithAsyncTableScanner = TestAsyncTableScanMetrics::doScanWithAsyncTableScanner; - return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable }, - new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan }, - new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner }); + return Stream.of(Arguments.of("doScanWithRawAsyncTable", doScanWithRawAsyncTable), + Arguments.of("doScanWithAsyncTableScan", doScanWithAsyncTableScan), + Arguments.of("doScanWithAsyncTableScanner", doScanWithAsyncTableScanner)); } - @BeforeClass + @BeforeAll public static void setUp() throws Exception { UTIL.startMiniCluster(3); // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that @@ -113,7 +112,7 @@ public static void setUp() throws Exception { NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); } - @AfterClass + @AfterAll public static void tearDown() throws Exception { Closeables.close(CONN, true); UTIL.shutdownMiniCluster(); @@ -149,7 +148,7 @@ private static Pair, ScanMetrics> doScanWithAsyncTableScanner(Scan } } - @Test + @TestTemplate public void testScanMetricsDisabled() throws Exception { Pair, ScanMetrics> pair = method.scan(new Scan()); assertEquals(3, pair.getFirst().size()); @@ -157,11 +156,13 @@ public void testScanMetricsDisabled() throws Exception { assertNull(pair.getSecond()); } - @Test + @TestTemplate public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception { Scan scan = new Scan(); scan.setScanMetricsEnabled(true); + long startNanos = System.nanoTime(); Pair, ScanMetrics> pair = method.scan(scan); + long endNanos = System.nanoTime(); List results = pair.getFirst(); assertEquals(3, results.size()); long bytes = getBytesOfResults(results); @@ -171,9 +172,11 @@ public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception { assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); // Assert scan metrics have not been collected by region assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); + assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(), + both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos)))); } - @Test + @TestTemplate public void testScanMetricsByRegionForSingleRegionScan() throws Exception { Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes("zzz1"), true); @@ -201,13 +204,17 @@ public void testScanMetricsByRegionForSingleRegionScan() throws Exception { // was scanned. assertEquals(scanMetrics.getMetricsMap(false), metrics); } + // we only have 1 rpc call so there is no millis 'between nexts' + assertEquals(0, scanMetrics.sumOfMillisSecBetweenNexts.get()); } - @Test + @TestTemplate public void testScanMetricsByRegionForMultiRegionScan() throws Exception { Scan scan = new Scan(); scan.setEnableScanMetricsByRegion(true); + long startNanos = System.nanoTime(); Pair, ScanMetrics> pair = method.scan(scan); + long endNanos = System.nanoTime(); List results = pair.getFirst(); assertEquals(3, results.size()); long bytes = getBytesOfResults(results); @@ -241,6 +248,8 @@ public void testScanMetricsByRegionForMultiRegionScan() throws Exception { } } assertEquals(3, rowsScannedAcrossAllRegions); + assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(), + both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos)))); } static long getBytesOfResults(List results) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java index c5186ad79c3f..607bfc228239 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; +import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; @@ -336,6 +337,8 @@ public void run() { .entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); + // Remove millis between nexts metric as it is not deterministic + metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); @@ -639,6 +642,8 @@ private void mergeScanMetricsByRegion(Map> entry : srcMap.entrySet()) { ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); Map metricsMap = entry.getValue(); + // Remove millis between nexts metric as it is not deterministic + metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); if (dstMap.containsKey(scanMetricsRegionInfo)) {