-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix](cloud) fix table and partition get_version #60064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes version caching issues in cloud mode for both table and partition version retrieval. The changes address how session variables are accessed and improve thread safety through volatile field declarations.
Changes:
- Updated CloudPartition and OlapTable to use volatile fields for cache timestamps and versions
- Modified version cache expiration checks to use ConnectContext-based session variable access
- Enhanced batch version retrieval to properly update caches
- Added missing waitForPendingTxn parameter to version requests
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| CloudPartitionTest.java | Updated tests to properly initialize ConnectContext with SessionVariable for instance-based access patterns |
| CloudPartition.java | Made cache fields volatile, changed method visibility for testing, updated cache expiration logic to use ConnectContext, added waitForPendingTxn parameter, optimized ArrayList capacity |
| OlapTable.java | Made cache fields volatile, changed cache comparison from > to >=, added cache updates in batch version retrieval, optimized with Collections.emptyList() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
Outdated
Show resolved
Hide resolved
d6e3a33 to
53d4bf6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (2)
fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java:63
- The ConnectContext set via ctx.setThreadLocalInfo() at line 51 is not cleaned up after the test completes. This could cause the thread-local context to leak into other tests. Add ConnectContext.remove() in a finally block or use @After/@AfterEach to ensure cleanup.
@Test
public void testIsCachedVersionExpired() {
// Create ConnectContext with SessionVariable
ConnectContext ctx = new ConnectContext();
ctx.setSessionVariable(new SessionVariable());
ctx.setThreadLocalInfo();
// test isCachedVersionExpired
CloudPartition part = createPartition(1, 2, 3);
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0;
Assertions.assertTrue(part.isCachedVersionExpired());
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -10086;
part.setCachedVisibleVersion(2, 10086L); // update version and last cache time
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 10000;
Assertions.assertFalse(part.isCachedVersionExpired()); // not expired due to long expiration duration
Assertions.assertEquals(2, part.getCachedVisibleVersion());
}
fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java:172
- The ConnectContext set via ctx.setThreadLocalInfo() at line 70 is not cleaned up after the test completes. This could cause the thread-local context to leak into other tests. Add ConnectContext.remove() in a finally block or use @After/@AfterEach to ensure cleanup, similar to how it's done in OlapTableTest.java at line 405.
@Test
public void testCachedVersion() throws RpcException {
// Create ConnectContext with SessionVariable
ConnectContext ctx = new ConnectContext();
ctx.setSessionVariable(new SessionVariable());
ctx.setThreadLocalInfo();
CloudPartition part = createPartition(1, 2, 3);
List<CloudPartition> parts = new ArrayList<>();
for (long i = 0; i < 3; ++i) {
parts.add(createPartition(5 + i, 5 + i, 5 + i));
}
Assertions.assertEquals(-1, part.getCachedVisibleVersion()); // not initialized FE side version
// CHECKSTYLE OFF
final ArrayList<Long> singleVersions = new ArrayList<>(Arrays.asList(2L, 3L, 4L, 5L));
final ArrayList<ArrayList<Long>> batchVersions = new ArrayList<>(Arrays.asList(
new ArrayList<>(Arrays.asList(1L, 2L, -1L)),
new ArrayList<>(Arrays.asList(2L, 3L, -1L)),
new ArrayList<>(Arrays.asList(3L, 4L, -1L)),
new ArrayList<>(Arrays.asList(5L, -1L))
));
final Integer[] callCount = {0};
new MockUp<VersionHelper>(VersionHelper.class) {
@Mock
public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) {
Cloud.GetVersionResponse.Builder builder = Cloud.GetVersionResponse.newBuilder();
builder.setVersion(singleVersions.get(callCount[0]));
builder.addAllVersions(batchVersions.get(callCount[0]));
++callCount[0];
return builder.build();
}
};
// CHECKSTYLE ON
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -1; // disable cache
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); // should not get from cache
Assertions.assertEquals(1, callCount[0]); // issue a rpc call to meta-service
// test snapshot versions
List<Long> versions = CloudPartition.getSnapshotVisibleVersion(parts); // should not get from cache
Assertions.assertEquals(2, callCount[0]); // issue a rpc call to meta-service
for (int i = 0; i < batchVersions.get(1).size(); ++i) {
Long exp = batchVersions.get(1).get(i);
if (exp == -1) {
exp = CloudPartition.PARTITION_INIT_VERSION;
}
Assertions.assertEquals(exp, versions.get(i));
}
}
// enable change expiration and make it cached in long duration
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 100000;
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); // cached version
Assertions.assertEquals(2, callCount[0]); // issue a rpc call to meta-service
// test snapshot versions
List<Long> versions = CloudPartition.getSnapshotVisibleVersion(parts); // should not get from cache
Assertions.assertEquals(2, callCount[0]); // issue a rpc call to meta-service
for (int i = 0; i < batchVersions.get(1).size(); ++i) {
Long exp = batchVersions.get(1).get(i);
if (exp == -1) {
exp = CloudPartition.PARTITION_INIT_VERSION;
}
Assertions.assertEquals(exp, versions.get(i));
}
}
// enable change expiration and make it expired
ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 500;
try {
Thread.sleep(550);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// make some partition not expired, these partitions will not get version from meta-service
CloudPartition hotPartition = parts.get(0);
hotPartition.setCachedVisibleVersion(hotPartition.getCachedVisibleVersion(), 10086L);
Assertions.assertEquals(2, hotPartition.getCachedVisibleVersion());
Assertions.assertFalse(hotPartition.isCachedVersionExpired());
Assertions.assertTrue(parts.get(1).isCachedVersionExpired());
Assertions.assertTrue(parts.get(2).isCachedVersionExpired());
{
// test single get version
Assertions.assertEquals(4, part.getVisibleVersion()); // should not get from cache
Assertions.assertEquals(3, callCount[0]); // issue a rpc call to meta-service
// test snapshot versions
List<Long> versions = CloudPartition.getSnapshotVisibleVersion(parts); // should not get from cache
Assertions.assertEquals(versions.size(), parts.size());
Assertions.assertEquals(4, callCount[0]); // issue a rpc call to meta-service
for (int i = 0; i < batchVersions.get(3).size(); ++i) {
Long exp = batchVersions.get(3).get(i);
if (exp == -1) {
exp = CloudPartition.PARTITION_INIT_VERSION;
}
Assertions.assertEquals(exp, versions.get(i + 1)); // exclude the first hot partition
}
// hot partition version not changed
Assertions.assertEquals(2, hotPartition.getCachedVisibleVersion());
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 31153 ms |
TPC-DS: Total hot run time: 173859 ms |
ClickBench: Total hot run time: 26.79 s |
|
run p0 |
FE Regression Coverage ReportIncrement line coverage |
1 similar comment
FE Regression Coverage ReportIncrement line coverage |
dataroaring
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Review: fix fix table and partition get_version
Summary
This PR fixes several issues with version caching in cloud mode:
- Converts
cloudPartitionVersionCacheTtlMsfrom static to instance variable - Adds
volatilefor thread safety on cache fields - Fixes cache update logic and adds missing RPC parameter
- Updates cache after batch version fetching
Analysis of Changes
1. [Good] Static to Instance Variable ✓
// Before
public static long cloudPartitionVersionCacheTtlMs = 0;
// After
public long cloudPartitionVersionCacheTtlMs = 0;This enables per-session cache TTL configuration instead of a global setting.
2. [Good] Added volatile for Thread Safety ✓
// OlapTable
private volatile long lastTableVersionCachedTimeMs = 0;
private volatile long cachedTableVersion = -1;
// CloudPartition
private volatile long lastVersionCachedTimeMs = 0;This ensures visibility of cache updates across threads.
3. [Bug Fix] Missing RPC Parameter ✓
// Before - waitForPendingTxns was ignored!
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
.setRequestIp(...)
.setDbId(...)
// missing: .setWaitForPendingTxn(waitForPendingTxns)
.build();
// After
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
...
.setWaitForPendingTxn(waitForPendingTxns) // Now included
.build();This is a real bug fix - the waitForPendingTxns parameter was being passed to the method but never sent to the meta-service!
4. [Good] Cache Update After Batch Fetch ✓
List<Long> versions = getVisibleVersionFromMeta(dbIds, tableIds);
// update cache - NEW
Preconditions.checkState(tables.size() == versions.size());
for (int i = 0; i < tables.size(); i++) {
tables.get(i).setCachedTableVersion(versions.get(i));
}This ensures the cache is populated after fetching from meta-service, reducing future RPC calls.
5. [Good] Null Check for ConnectContext ✓
// Before - potential NPE
long cacheExpirationMs = SessionVariable.cloudPartitionVersionCacheTtlMs;
// After - safe
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
return true; // expired if no context
}
long cacheExpirationMs = ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs;Issues Found
1. [Medium] Potential Race Condition in Cache Update
The volatile keyword provides visibility but doesn't protect the compound check-then-update operation:
protected void setCachedTableVersion(long version) {
if (version >= cachedTableVersion) { // read
cachedTableVersion = version; // write - race window here
lastTableVersionCachedTimeMs = System.currentTimeMillis();
}
}Two threads could both pass the check and overwrite each other. However, since the version only increases monotonically, this is acceptable - both writes will set valid values. Worth documenting with a comment though.
2. [Low] Change from > to >= Needs Clarification
// Before
if (version > cachedTableVersion) {
// After
if (version >= cachedTableVersion) {This allows refreshing the cache timestamp even when the version hasn't changed. If intentional, consider adding a comment:
// Use >= to refresh cache timestamp even if version unchanged,
// extending the cache validity period on repeated reads
if (version >= cachedTableVersion) {3. [Low] Inconsistent Cache Expiration Check Pattern
CloudPartition.isCachedVersionExpired() checks lastVersionCachedTimeMs == 0:
if (lastVersionCachedTimeMs == 0) {
return true;
}But OlapTable.isCachedTableVersionExpired() checks cachedTableVersion == -1:
if (cachedTableVersion == -1) {
return true;
}Consider making the pattern consistent between the two classes.
Positive Aspects
- Bug fix for
setWaitForPendingTxn: This was a real bug where the parameter was ignored - Pre-sized ArrayList allocation:
new ArrayList<>(tables.size())avoids resizing Collections.emptyList(): Better thannew ArrayList<>()for empty results- Test updates: Properly updated tests to use ConnectContext with session variables
@VisibleForTestingannotations: Good practice for test helper methods
Verdict
Looks good to merge ✓
Minor suggestions:
- Add comment explaining why
>=instead of>for cache update - Consider documenting the race condition acceptability in
setCachedTableVersion - Consider making cache expiration check pattern consistent
The setWaitForPendingTxn bug fix alone makes this PR valuable.
FE Regression Coverage ReportIncrement line coverage |
|
run nonConcurrent |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)