-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-30089 Rewrite AbstractTestAsyncTableScan and related sub classes #8099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,10 +27,11 @@ | |
| import static org.hamcrest.Matchers.hasProperty; | ||
| import static org.hamcrest.Matchers.is; | ||
| import static org.hamcrest.Matchers.isA; | ||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertThrows; | ||
| import static org.junit.Assert.fail; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.junit.jupiter.api.Assertions.fail; | ||
|
|
||
| import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; | ||
| import io.opentelemetry.sdk.trace.data.SpanData; | ||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
|
|
@@ -44,82 +45,66 @@ | |
| import java.util.stream.Collectors; | ||
| import java.util.stream.IntStream; | ||
| import java.util.stream.Stream; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.ConnectionRule; | ||
| import org.apache.hadoop.hbase.HBaseTestingUtil; | ||
| import org.apache.hadoop.hbase.MatcherPredicate; | ||
| import org.apache.hadoop.hbase.MiniClusterRule; | ||
| import org.apache.hadoop.hbase.StartTestingClusterOption; | ||
| import org.apache.hadoop.hbase.TableName; | ||
| import org.apache.hadoop.hbase.Waiter; | ||
| import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; | ||
| import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; | ||
| import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; | ||
| import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; | ||
| import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; | ||
| import org.apache.hadoop.hbase.trace.TraceUtil; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hadoop.hbase.util.JVMClusterUtil; | ||
| import org.apache.hadoop.hbase.util.Pair; | ||
| import org.hamcrest.Matcher; | ||
| import org.junit.ClassRule; | ||
| import org.junit.Rule; | ||
| import org.junit.Test; | ||
| import org.junit.rules.ExternalResource; | ||
| import org.junit.rules.RuleChain; | ||
| import org.junit.rules.TestName; | ||
| import org.junit.rules.TestRule; | ||
| import org.junit.jupiter.api.AfterAll; | ||
| import org.junit.jupiter.api.BeforeAll; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.TestInfo; | ||
| import org.junit.jupiter.api.TestTemplate; | ||
| import org.junit.jupiter.api.extension.RegisterExtension; | ||
| import org.junit.jupiter.params.provider.Arguments; | ||
|
|
||
| public abstract class AbstractTestAsyncTableScan { | ||
| import org.apache.hbase.thirdparty.com.google.common.io.Closeables; | ||
|
|
||
| protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create(); | ||
| public abstract class AbstractTestAsyncTableScan { | ||
|
|
||
| private static Configuration createConfiguration() { | ||
| Configuration conf = new Configuration(); | ||
| // Disable directory sharing to prevent race conditions when tests run in parallel. | ||
| // Each test instance gets its own isolated directories to avoid one test's tearDown() | ||
| // deleting directories another parallel test is still using. | ||
| conf.setBoolean("hbase.test.disable-directory-sharing", true); | ||
| return conf; | ||
| } | ||
| @RegisterExtension | ||
| protected static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create(); | ||
|
|
||
| protected static final MiniClusterRule MINI_CLUSTER_RULE = | ||
| MiniClusterRule.newBuilder().setConfiguration(createConfiguration()) | ||
| .setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build(); | ||
| protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); | ||
|
|
||
| protected static final ConnectionRule CONN_RULE = | ||
| ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection); | ||
| protected static AsyncConnection CONN; | ||
|
|
||
| private static final class Setup extends ExternalResource { | ||
| @Override | ||
| protected void before() throws Throwable { | ||
| final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility(); | ||
| final AsyncConnection conn = CONN_RULE.getAsyncConnection(); | ||
| protected String methodName; | ||
|
|
||
| byte[][] splitKeys = new byte[8][]; | ||
| for (int i = 111; i < 999; i += 111) { | ||
| splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); | ||
| } | ||
| testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); | ||
| testingUtil.waitTableAvailable(TABLE_NAME); | ||
| conn.getTable(TABLE_NAME) | ||
| .putAll(IntStream.range(0, COUNT) | ||
| .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) | ||
| .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) | ||
| .collect(Collectors.toList())) | ||
| .get(); | ||
| @BeforeAll | ||
| public static void setUpBeforeClass() throws Exception { | ||
| UTIL.startMiniCluster(3); | ||
| byte[][] splitKeys = new byte[8][]; | ||
| for (int i = 111; i < 999; i += 111) { | ||
| splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); | ||
| } | ||
| UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); | ||
| UTIL.waitTableAvailable(TABLE_NAME); | ||
| try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { | ||
| table.put(IntStream.range(0, COUNT) | ||
| .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) | ||
| .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) | ||
| .collect(Collectors.toList())); | ||
| } | ||
| CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); | ||
| } | ||
|
|
||
| @ClassRule | ||
| public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE) | ||
| .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup()); | ||
|
|
||
| @Rule | ||
| public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE); | ||
| @AfterAll | ||
| public static void tearDownAfterClass() throws Exception { | ||
| Closeables.close(CONN, true); | ||
| UTIL.shutdownMiniCluster(); | ||
| } | ||
|
|
||
| @Rule | ||
| public final TestName testName = new TestName(); | ||
| @BeforeEach | ||
| public void setUp(TestInfo testInfo) { | ||
| methodName = testInfo.getTestMethod().get().getName(); | ||
| } | ||
|
|
||
| protected static TableName TABLE_NAME = TableName.valueOf("async"); | ||
|
|
||
|
|
@@ -149,11 +134,11 @@ private static Scan createBatchSmallResultSizeScan() { | |
| } | ||
|
|
||
| private static AsyncTable<?> getRawTable() { | ||
| return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME); | ||
| return CONN.getTable(TABLE_NAME); | ||
| } | ||
|
|
||
| private static AsyncTable<?> getTable() { | ||
| return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); | ||
| return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); | ||
| } | ||
|
|
||
| private static List<Pair<String, Supplier<Scan>>> getScanCreator() { | ||
|
|
@@ -164,23 +149,20 @@ private static List<Pair<String, Supplier<Scan>>> getScanCreator() { | |
| AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); | ||
| } | ||
|
|
||
| protected static List<Object[]> getScanCreatorParams() { | ||
| return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) | ||
| .collect(Collectors.toList()); | ||
| protected static Stream<Arguments> getScanCreatorParams() { | ||
| return getScanCreator().stream().map(p -> Arguments.of(p.getFirst(), p.getSecond())); | ||
| } | ||
|
|
||
| private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { | ||
| return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), | ||
| Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); | ||
| } | ||
|
|
||
| protected static List<Object[]> getTableAndScanCreatorParams() { | ||
| protected static Stream<Arguments> getTableAndScanCreatorParams() { | ||
| List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); | ||
| List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); | ||
| return tableCreator.stream() | ||
| .flatMap(tp -> scanCreator.stream() | ||
| .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) | ||
| .collect(Collectors.toList()); | ||
| return tableCreator.stream().flatMap(tp -> scanCreator.stream() | ||
| .map(sp -> Arguments.of(tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond()))); | ||
| } | ||
|
|
||
| protected abstract Scan createScan(); | ||
|
|
@@ -211,25 +193,22 @@ protected final List<Result> convertFromBatchResult(List<Result> results) { | |
| } | ||
|
|
||
| protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { | ||
| final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration(); | ||
| Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( | ||
| "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher))); | ||
| UTIL.waitFor(TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( | ||
| "Span for test failed to complete.", OTEL_EXT::getSpans, hasItem(parentSpanMatcher))); | ||
| } | ||
|
|
||
| protected static Stream<SpanData> spanStream() { | ||
| return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull); | ||
| return OTEL_EXT.getSpans().stream().filter(Objects::nonNull); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanAll() throws Exception { | ||
| List<Result> results = doScan(createScan(), -1); | ||
| // make sure all scanners are closed at RS side | ||
| MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() | ||
| UTIL.getHBaseCluster().getRegionServerThreads().stream() | ||
| .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach( | ||
| rs -> assertEquals( | ||
| "The scanner count of " + rs.getServerName() + " is " | ||
| + rs.getRSRpcServices().getScannersCount(), | ||
| 0, rs.getRSRpcServices().getScannersCount())); | ||
| rs -> assertEquals(0, rs.getRSRpcServices().getScannersCount(), "The scanner count of " | ||
| + rs.getServerName() + " is " + rs.getRSRpcServices().getScannersCount())); | ||
|
Comment on lines
+208
to
+211
|
||
| assertEquals(COUNT, results.size()); | ||
| IntStream.range(0, COUNT).forEach(i -> { | ||
| Result result = results.get(i); | ||
|
|
@@ -244,43 +223,41 @@ private void assertResultEquals(Result result, int i) { | |
| assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReversedScanAll() throws Exception { | ||
| List<Result> results = | ||
| TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); | ||
| TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), methodName); | ||
| assertEquals(COUNT, results.size()); | ||
| IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); | ||
| assertTraceContinuity(); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanNoStopKey() throws Exception { | ||
| int start = 345; | ||
| List<Result> results = TraceUtil.trace( | ||
| () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), | ||
| testName.getMethodName()); | ||
| methodName); | ||
| assertEquals(COUNT - start, results.size()); | ||
| IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); | ||
| assertTraceContinuity(); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReverseScanNoStopKey() throws Exception { | ||
| int start = 765; | ||
| final Scan scan = | ||
| createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true); | ||
| List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); | ||
| List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), methodName); | ||
| assertEquals(start + 1, results.size()); | ||
| IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); | ||
| assertTraceContinuity(); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanWrongColumnFamily() { | ||
| final Exception e = assertThrows(Exception.class, | ||
| () -> TraceUtil.trace( | ||
| () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), | ||
| testName.getMethodName())); | ||
| final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace( | ||
| () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), methodName)); | ||
| // hamcrest generic enforcement for `anyOf` is a pain; skip it | ||
| // but -- don't we always unwrap ExecutionExceptions -- bug? | ||
| if (e instanceof NoSuchColumnFamilyException) { | ||
|
|
@@ -349,7 +326,7 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole | |
| IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanWithStartKeyAndStopKey() throws Exception { | ||
| testScan(1, true, 998, false, -1); // from first region to last region | ||
| testScan(123, true, 345, true, -1); | ||
|
|
@@ -358,7 +335,7 @@ public void testScanWithStartKeyAndStopKey() throws Exception { | |
| testScan(456, false, 678, false, -1); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReversedScanWithStartKeyAndStopKey() throws Exception { | ||
| testReversedScan(998, true, 1, false, -1); // from last region to first region | ||
| testReversedScan(543, true, 321, true, -1); | ||
|
|
@@ -367,23 +344,23 @@ public void testReversedScanWithStartKeyAndStopKey() throws Exception { | |
| testReversedScan(876, false, 654, false, -1); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanAtRegionBoundary() throws Exception { | ||
| testScan(222, true, 333, true, -1); | ||
| testScan(333, true, 444, false, -1); | ||
| testScan(444, false, 555, true, -1); | ||
| testScan(555, false, 666, false, -1); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReversedScanAtRegionBoundary() throws Exception { | ||
| testReversedScan(333, true, 222, true, -1); | ||
| testReversedScan(444, true, 333, false, -1); | ||
| testReversedScan(555, false, 444, true, -1); | ||
| testReversedScan(666, false, 555, false, -1); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanWithLimit() throws Exception { | ||
| testScan(1, true, 998, false, 900); // from first region to last region | ||
| testScan(123, true, 234, true, 100); | ||
|
|
@@ -392,7 +369,7 @@ public void testScanWithLimit() throws Exception { | |
| testScan(456, false, 678, false, 100); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanWithLimitGreaterThanActualCount() throws Exception { | ||
| testScan(1, true, 998, false, 1000); // from first region to last region | ||
| testScan(123, true, 345, true, 200); | ||
|
|
@@ -401,7 +378,7 @@ public void testScanWithLimitGreaterThanActualCount() throws Exception { | |
| testScan(456, false, 678, false, 200); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReversedScanWithLimit() throws Exception { | ||
| testReversedScan(998, true, 1, false, 900); // from last region to first region | ||
| testReversedScan(543, true, 321, true, 100); | ||
|
|
@@ -410,7 +387,7 @@ public void testReversedScanWithLimit() throws Exception { | |
| testReversedScan(876, false, 654, false, 100); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { | ||
| testReversedScan(998, true, 1, false, 1000); // from last region to first region | ||
| testReversedScan(543, true, 321, true, 200); | ||
|
|
@@ -419,7 +396,7 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { | |
| testReversedScan(876, false, 654, false, 200); | ||
| } | ||
|
|
||
| @Test | ||
| @TestTemplate | ||
| public void testScanEndingEarly() throws Exception { | ||
| testScan(1, true, 998, false, 0, 900); // from first region to last region | ||
| testScan(123, true, 234, true, 0, 100); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
methodNameis set to the Java method name only. With@TestTemplateparameterized invocations, multiple runs of the same test method will typically share this value, which can cause span name collisions (e.g.,waitForSpan(hasName(methodName))may match a span from a different invocation). Consider usingtestInfo.getDisplayName()(or storing both method name + display name) and using that consistently forTraceUtil.trace(...)and span matchers so each invocation has a unique parent span name.