Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for orphan segment cleanup #15142

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

9aman
Copy link
Contributor

@9aman 9aman commented Feb 27, 2025

Context

Followup of this PR: #15048
The first PR ensured that we don't miss any segments that are created from now on.

This PR aims to fix the orphan segments that are present in deepstore and have passed the retention time but are neither present in ZK or IdealState.

Scope of the PR.

The PR aims to handle two scenarios:

  1. Old segments that did not get deleted from the deepstore but their SegmentZKMetadata was deleted by the retention manager. This happened due to the issue mentioned and resolved in: Ensure SegmentDeletionManager deletes deepstore files created by BaseMultipleSegmentsConversionExecutor #15048
  2. The deletion by retention manager is performed as follows:
    • Find all the segments present in the ZK ( i.e. have ZK Metadata entry)
    • Delete Segment from IS
    • Delete Segment from ZK
    • Delete Segment from deepstore
      The RententionManager will not be able to delete a segment in the subsequent runs if the controller restarted etc. between the last two steps i.e. any failure between the last two steps of the process.

Testing

  • Unit Tests were added to ensure that segments present in deepstore that are missing in ZK are picked for deletion.

Update

After analyzing the test results:

https://docs.google.com/document/d/1ZvNefSsRL716NspQc1f5VrRYjd9CSqocac5TIfLJWPA/edit?usp=sharing

I concluded that limiting the number of segments deleted in a single run would be beneficial. To address this, I have introduced a configurable batch size for segment deletion. (untrackedSegmentsDeletionBatchSize part of SegmentsValidationAndRetentionConfig)

Reasons for This Change

Deletion

  • Prevent starvation of deepstore cleanup: Ensures that the RetentionManager for other tables can also delete files without delays.
  • Avoid blocking API-based segment deletions:
    • Some APIs rely on the same code for deleting segments from deepstore.
    • The API will respond but the deletion will stay blocked
  • Segment Lineage update function at the end does deepstore cleanup. It will not stay blocked as we just submit the request in a single threaded executor service but deepstore cleanup will stay blocked
  • UpsertCompactionTaskGenerator and UpsertCompactMergeTaskGenerator also use the same code run using a single threaded executor.
  protected void deleteSegmentsWithDelay(String tableName, Collection<String> segmentIds,
      Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
    _executorService.schedule(new Runnable() {
      @Override
      public void run() {
        deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletedSegmentsRetentionMs, deletionDelaySeconds);
      }
    }, deletionDelaySeconds, TimeUnit.SECONDS);
  }


protected synchronized void deleteSegmentFromPropertyStoreAndLocal

@codecov-commenter
Copy link

codecov-commenter commented Feb 27, 2025

Codecov Report

Attention: Patch coverage is 87.50000% with 9 lines in your changes missing coverage. Please review.

Project coverage is 63.68%. Comparing base (59551e4) to head (6894d6b).
Report is 1837 commits behind head on master.

Files with missing lines Patch % Lines
...troller/helix/core/retention/RetentionManager.java 88.33% 4 Missing and 3 partials ⚠️
.../controller/helix/core/SegmentDeletionManager.java 0.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15142      +/-   ##
============================================
+ Coverage     61.75%   63.68%   +1.92%     
- Complexity      207     1461    +1254     
============================================
  Files          2436     2772     +336     
  Lines        133233   156306   +23073     
  Branches      20636    23988    +3352     
============================================
+ Hits          82274    99536   +17262     
- Misses        44911    49283    +4372     
- Partials       6048     7487    +1439     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.65% <87.50%> (+1.94%) ⬆️
java-21 63.57% <87.50%> (+1.94%) ⬆️
skip-bytebuffers-false 63.67% <87.50%> (+1.92%) ⬆️
skip-bytebuffers-true 63.50% <87.50%> (+35.77%) ⬆️
temurin 63.68% <87.50%> (+1.92%) ⬆️
unittests 63.67% <87.50%> (+1.92%) ⬆️
unittests1 56.16% <100.00%> (+9.27%) ⬆️
unittests2 34.25% <87.50%> (+6.52%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@9aman 9aman force-pushed the deeptore_cleanup_for_historical_segments branch from a14c5bc to d2d1bcf Compare February 27, 2025 09:28
@@ -124,8 +138,19 @@ private void manageRetentionForTable(TableConfig tableConfig) {
}

private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Segment metadata is added to ZK before it's added to IdealState
Segment metadata is removed from ZK after the segment has been removed from the ideal state.

The segment list from ZK will always be a super set of segments in Ideal State. Thus, I have not checked IS for filtering segments that are still in IS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just rely on the ideal state as source of truth because that would avoid reading all of segment Zk ? We are going to check retention window as well prior to deletion (if the concern is deleting a new segment that just got added, which can happen even when the segment is not in Zk metadata but just in deep store).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deletion occurs based on the endTime present in the segmentZKMetadata. That's why the isPurgeable relies on segmentZKMetadata to decide whether we need to remove a segment or not.
Are you suggesting some other approach where we move away from endTime of the segment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it happen that segment's ZKMetadata was still there in ZK but segment was not in IS any more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially did not realize that we already loop through all of segmentZk metadata to check if segment is purge-able (retentionStrategy.isPurgeable).
Yes its possible for a segment to be Zk metadata but not in IS @klsince, but we want to handle that case as well right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the deletion fails in the middle then the segment will be in ZK but not in IS.
Moreover, as per Xiang we have seen cases where the SegmentRefreshTask etc push segment in Deepstore and ZK but fail to update the IS.
In that case as well we have such segments.


long lastModifiedTime = fileMetadata.getLastModifiedTime();

if (retentionStrategy.isPurgeable(segmentName, offlineTableName, lastModifiedTime)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures that a segment that has not entry in the ZK but has been uploaded to deepstore does not get deleted.

}

String segmentName = extractSegmentName(fileMetadata.getFilePath());
if (segmentName == null || segmentsToExclude.contains(segmentName)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rely on endTime in ZK Metadata for retention in case the segment ZK metadata is present.
This function relies on file modification time to decide whether a segment should be purged or not.

@9aman 9aman force-pushed the deeptore_cleanup_for_historical_segments branch from 2fd0b0e to 4060631 Compare February 27, 2025 11:17
@@ -124,8 +138,19 @@ private void manageRetentionForTable(TableConfig tableConfig) {
}

private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not just rely on the ideal state as source of truth because that would avoid reading all of segment Zk ? We are going to check retention window as well prior to deletion (if the concern is deleting a new segment that just got added, which can happen even when the segment is not in Zk metadata but just in deep store).

* @param endTimeMs The end time of the segment in milliseconds
* @return Whether the segment should be purged
*/
boolean isPurgeable(String segmentName, String tableNameWithType, long endTimeMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest having this method local to SegmentDeletionManager since endTime is supplied from outside. It'll be confusing as to what needs to be passed in (file modification time/segment time etc..). The caller can get the retention time and do its own checks ?

Copy link
Contributor Author

@9aman 9aman Mar 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I would still keep it as part of the RetentionManager. I feel that the SegmentDeletionManager should just have the job of deleting (moving to Deleted_Segments dir) and not know what segments needs to be deleted.

For our case, the RetentionManager can fetch the segments from deepstore and decide based on the file modification time whether they need to be deleted or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second pass I think it's fine to have this.
One can pass the endTimeMs that needs to be compared to System.currentTimeMillis() - retentionMS i.e.
System.currentTimeMillis() - retentionMS > endTimeMs.

It's a more generic implementation of the

public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata)

Where one only needs to rely on endTime present in segmentZKMetadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have changed as per @klsince suggestions.

@9aman 9aman marked this pull request as ready for review February 28, 2025 07:42
@@ -221,6 +221,7 @@ private void deleteSegmentMetadataFromStore(PinotFS pinotFS, URI segmentFileUri,
URI segmentMetadataUri = SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), segmentId);
if (pinotFS.exists(segmentMetadataUri)) {
LOGGER.info("Deleting segment metadata {} from {}", segmentId, segmentMetadataUri);
// TODO: check if the deletion was successful and add a warning here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO supposed to check the return of the delete() method and log warning if false? if so, maybe we can just add it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -77,6 +88,7 @@ protected void processTable(String tableNameWithType) {
return;
}

// Did not understand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for pointing it out.

@@ -124,8 +138,19 @@ private void manageRetentionForTable(TableConfig tableConfig) {
}

private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it happen that segment's ZKMetadata was still there in ZK but segment was not in IS any more?

segmentsToDelete.add(segmentName);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an INFO log about how many segments to delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme());

List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe print a INFO log here about how many segments found, and the timestamp in the log, with another INFO log after the for-loop, can be used to calculate how long it'd take to figure out the files to delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -50,4 +50,17 @@ public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZK

return System.currentTimeMillis() - endTimeMs > _retentionMs;
}

@Override
public boolean isPurgeable(String segmentName, String tableNameWithType, long endTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. better move tableNameWithType as the first param for consistency
  2. can reuse this method to implement the method above
  3. perhaps call endTimeMs segmentTimeMs to be generic? and leave a comment that segmentTimeMs can be endTime or mtime etc. to be decided by the caller

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

URI tableDataUri = URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName);
PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme());

List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a metric to get count of files that are in deep store but not in segmentZkMetadata ? This is to see how many dangling files are in the deep store

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -124,8 +138,19 @@ private void manageRetentionForTable(TableConfig tableConfig) {
}

private void manageRetentionForOfflineTable(String offlineTableName, RetentionStrategy retentionStrategy) {
List<SegmentZKMetadata> segmentZKMetadataList = _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially did not realize that we already loop through all of segmentZk metadata to check if segment is purge-able (retentionStrategy.isPurgeable).
Yes its possible for a segment to be Zk metadata but not in IS @klsince, but we want to handle that case as well right?

@9aman 9aman force-pushed the deeptore_cleanup_for_historical_segments branch from 5cdcfef to abac218 Compare March 7, 2025 09:53

List<FileMetadata> deepstoreFiles = pinotFS.listFilesWithMetadata(tableDataUri, false);
long listEndTimeMs = System.currentTimeMillis();
LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to list segments: {} ms",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Found ... in {} ms to be short

}
}
long endTimeMs = System.currentTimeMillis();
LOGGER.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps combine the two INFO together, as "Found {} segments ... have no corresponding ZK metadata, in {} msg"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} catch (IOException e) {
LOGGER.warn("Unable to fetch segments from deep store that are beyond retention period for table: {}",
realtimeTableName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a helper method from L241-L258 and use it for both manageRetentionForOfflineTable and manageRetentionForREaltimeTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@9aman 9aman force-pushed the deeptore_cleanup_for_historical_segments branch 2 times, most recently from a77116a to 168a771 Compare March 10, 2025 07:39
@9aman 9aman force-pushed the deeptore_cleanup_for_historical_segments branch from 168a771 to e3b7d7e Compare March 12, 2025 08:40
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.UNTRACKED_SEGMENTS_COUNT,
segmentsToDelete.size());

if (segmentsToDelete.size() > untrackedSegmentsDeletionBatchSize) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the logic to pick as per the configured batch size.

cc: @swaminathanmanish @klsince

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ideal way would be to do pagination on the FS and bring only untrackedSegmentsDeletionBatchSize number of files/ segments from deepstore.
We can add this improvement later.

List<SegmentZKMetadata> segmentZKMetadataList, int untrackedSegmentsDeletionBatchSize) {
List<String> segmentsToDelete = new ArrayList<>();

if (untrackedSegmentsDeletionBatchSize <= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging and returning early in case it's set to zero to avoid listing all the files in the deepstore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants