Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Substitute TransferableBlock with MseBlock #15245

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Mar 11, 2025

Introduction

This is a large mechanical PR that replaces TransferableBlock with a new MseBlock. I’ve wanted to apply this refactor for over a year but postponed it for other reasons. Now, with the need to add multi-stage stats on failures, implementing that change within TransferableBlock proved too complex. As a result, I also redesigned how stats are communicated between operators.

From TransferableBlock to MseBlock

TransferableBlock tried to model too many different things at once. It had three logical types:

  • Data blocks (containing actual data)
  • Successful End-of-stream (EOS) blocks (signaling completion)
  • Erroneous End-of-stream (EOS) blocks (indicating failures)

Additionally, blocks could be in either a serialized or deserialized form, significantly increasing complexity. A single implementation had to support all cases, leading to an API where many methods were illegal in certain contexts. This made it difficult to determine when a method was safe to call.

For instance, TransferableBlock was designed to store stats only in EOS blocks, but the API allowed stats in other block types, potentially breaking invariants.

MseBlock: A More Structured Approach

The new MseBlock follows a different design principle. Instead of a single class handling all cases, we now have separate interfaces for each block type. The actual methods are defined in these interfaces, reducing ambiguity.

For example, MseBlock itself does not have a getNumRows() method. Instead, that method is part of MseBlock.Data. This forces callers to explicitly cast their MseBlock to the appropriate type, making the API safer.

While this may initially seem more restrictive, it actually simplifies development. Previously, developers had to rely on context to determine whether a method could be called. Now, the Java compiler enforces correctness, reducing potential errors.

New MseBlock Structure

classDiagram
    class MseBlock {
        <<interface>>
        +boolean isData()
        +boolean isEos()
        +boolean isSuccess()
        +boolean isError()
    }
    
    class MseBlock.Data {
        <<interface>>
        +int getNumRows()
        +DataSchema getDataSchema()
        +RowHeapDataBlock asRowHeap()
        +SerializedDataBlock asSerialized()
        +boolean isRowHeap()
        +boolean isSerialized()
    }

    class MseBlock.Eos {
        <<interface>>
    }

    class RowHeapDataBlock {
        -DataSchema _dataSchema
        -List<Object[]> _rows
        -AggregationFunction[] _aggFunctions
        +List<Object[]> getRows()
        +AggregationFunction[] getAggFunctions()
    }

    class SerializedDataBlock {
        -DataBlock _dataBlock
        +DataSchema getDataSchema()
    }

    class SuccessMseBlock {
    }

    class ErrorMseBlock {
        -EnumMap<QueryErrorCode, String> _errorMessages
        +Map<QueryErrorCode, String> getErrorMessages()
    }

    MseBlock <|-- MseBlock.Data
    MseBlock <|-- MseBlock.Eos
    MseBlock.Data <|-- RowHeapDataBlock
    MseBlock.Data <|-- SerializedDataBlock
    MseBlock.Eos <|-- SuccessMseBlock
    MseBlock.Eos <|-- ErrorMseBlock
Loading

Currently, this structure requires explicit casting (or use the visitor pattern), but once Pinot upgrades to Java 17 or 21, we can use sealed classes and pattern matching, further simplifying the API.

A New Way to Communicate Stats

Another key design change is that MseBlock never stores stats.

In TransferableBlock, stats were added as a workaround inside DataBlock, making the API more complex. This also polluted MultiStageOperator.nextBlock with unnecessary logic for collecting stats.

MseBlock, instead, does not contain stats at all. MultiStageOperator.nextBlock() now only deals with data blocks, EOS blocks, and error blocks. Instead of storing stats within blocks, we compute them separately using a new method:

calculateStats() Workflow

  1. Call calculateStats() on each input operator to retrieve upstream stats. If no inputs exist, start with empty stats.
  2. Merge upstream stats.
  3. Add stats from the current operator.

The exception is MailboxReceivaeOperator, which cannot directly call calculateStats() on upstream operators in other
stages (as they may be running in separate JVMs). Instead, stats are sent through SendingMailbox, packaged with EOS blocks.

How Stats Are Transmitted

  • When a MailboxSendOperator receives an EOS or error block, it calls calculateStats() and calls Exchange.send(MseBlock.Eos, Stats)
  • Exchange ends up calling SendingMailbox.send(MseBlock.Eos, Stats), whose implementation depends on the transport layer.
    • In the gRPC implementation, this is packaged into the same DataBlock format as before.
    • In the memory implementation, we directly add the pair (EOS block, Stats) into the ReceivingMailbox queue.
  • When ReceivingMailbox reads messages, it extracts the stats, which are then retrieved when
    MailboxReceiveOperator.calculateStats() is called.

This system ensures transparent multi-stage stat propagation while keeping MseBlock clean. It also shows how the new MseBlock API can be used to enforce a cleaner design. The new SendingMailbox and Exchange methods define their restrictions using Java typesystem:

  • Data blocks are never sent with stats
  • EOS blocks are always sent with stats (even if empty)

Summary of Benefits

  • Simplified API: MseBlock eliminates ambiguous method calls and enforces correctness at the compiler level.
  • Cleaner Stats Handling: Stats are no longer a hack inside data blocks, reducing unnecessary complexity.
  • Better Type Safety: Separate interfaces prevent illegal operations on blocks.
  • Future-Proof Design: Once Pinot upgrades to Java 17/21, pattern matching and sealed classes will further improve usability.

@gortiz gortiz marked this pull request as draft March 11, 2025 10:34
@codecov-commenter
Copy link

codecov-commenter commented Mar 11, 2025

Codecov Report

Attention: Patch coverage is 0% with 544 lines in your changes missing coverage. Please review.

Project coverage is 34.17%. Comparing base (59551e4) to head (758c418).
Report is 1835 commits behind head on master.

Files with missing lines Patch % Lines
...g/apache/pinot/query/mailbox/ReceivingMailbox.java 0.00% 41 Missing ⚠️
...me/operator/utils/BlockingMultiStreamConsumer.java 0.00% 30 Missing ⚠️
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 0.00% 27 Missing ⚠️
...inot/query/runtime/operator/AggregateOperator.java 0.00% 27 Missing ⚠️
...runtime/plan/pipeline/PipelineBreakerOperator.java 0.00% 27 Missing ⚠️
...pinot/query/runtime/operator/BaseJoinOperator.java 0.00% 26 Missing ⚠️
...query/runtime/operator/exchange/BlockExchange.java 0.00% 26 Missing ⚠️
...ot/query/runtime/operator/MailboxSendOperator.java 0.00% 24 Missing ⚠️
...ache/pinot/query/runtime/blocks/BlockSplitter.java 0.00% 21 Missing ⚠️
...pinot/query/runtime/plan/MultiStageQueryStats.java 0.00% 20 Missing ⚠️
... and 33 more

❗ There is a different number of reports uploaded between BASE (59551e4) and HEAD (758c418). Click for more details.

HEAD has 6 uploads less than BASE
Flag BASE (59551e4) HEAD (758c418)
skip-bytebuffers-false 7 6
unittests 5 3
unittests1 2 0
java-11 5 4
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #15245       +/-   ##
=============================================
- Coverage     61.75%   34.17%   -27.58%     
- Complexity      207      687      +480     
=============================================
  Files          2436     2775      +339     
  Lines        133233   156261    +23028     
  Branches      20636    23944     +3308     
=============================================
- Hits          82274    53399    -28875     
- Misses        44911    98593    +53682     
+ Partials       6048     4269     -1779     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 34.15% <0.00%> (-27.55%) ⬇️
java-21 34.17% <0.00%> (-27.46%) ⬇️
skip-bytebuffers-false 34.16% <0.00%> (-27.58%) ⬇️
skip-bytebuffers-true 34.16% <0.00%> (+6.44%) ⬆️
temurin 34.17% <0.00%> (-27.58%) ⬇️
unittests 34.16% <0.00%> (-27.58%) ⬇️
unittests1 ?
unittests2 34.16% <0.00%> (+6.43%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gortiz gortiz force-pushed the mse-errors-with-stats branch 6 times, most recently from 94d7edc to 7c89b43 Compare March 11, 2025 12:57
@gortiz gortiz requested a review from ankitsultana March 11, 2025 13:46
@gortiz gortiz force-pushed the mse-errors-with-stats branch from 7c89b43 to 1792831 Compare March 11, 2025 13:53
@gortiz gortiz force-pushed the mse-errors-with-stats branch 3 times, most recently from cc3b2cd to 479a44a Compare March 12, 2025 10:08
@gortiz gortiz force-pushed the mse-errors-with-stats branch from 479a44a to 758c418 Compare March 12, 2025 10:41
/// Pinot source code is migrated to Java 17 or newer, specially in Java 21 where pattern matching can also be used,
/// removing the need for the [Visitor] pattern.
///
/// Meanwhile, the API force callers to do some castings, but it is a trade-off to have a more robust and maintainable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: relay on -> rely on .

/// Meanwhile, the API force callers to do some castings, but it is a trade-off to have a more robust and maintainable
/// codebase given that we can relay on Java typesystem to verify some important properties at compile time instead of
/// adding runtime checks.
///
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternative of -> alternative to

/// A block that contains data in row heap format.
///
/// This class is a subclass of [MseBlock.Data] and is used to store data in row heap format.
/// This is probably the less efficient way to store data, but it is also the easiest to work with.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everytime -> every time . As the day this comment was written -> At the time of writing (the comment)

@Nullable
@SuppressWarnings("rawtypes")
private final AggregationFunction[] _aggFunctions;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to document _aggFunctions .

// Keep reading the input blocks until we find a match row or all blocks are processed.
// TODO: Consider batching the rows to improve performance.
while (true) {
TransferableBlock block = _input.nextBlock();
if (block.isErrorBlock()) {
MseBlock block = _input.nextBlock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct ?

@@ -176,16 +174,25 @@ protected TransferableBlock getNextBlock()
// Terminate when receiving exception block
Map<Integer, String> exceptions = _exceptions;
if (exceptions != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is used twice so it could be made into a regular or static constructor in ErrorMseBlock.

@@ -156,15 +156,15 @@ public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
if (visit(left, context)) {
PipelineBreakerResult pipelineBreakerResult = context.getPipelineBreakerResult();
int resultMapId = pipelineBreakerResult.getNodeIdMap().get(right);
List<TransferableBlock> transferableBlocks =
pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList());
List<MseBlock> blocks = pipelineBreakerResult.getResultMap().getOrDefault(resultMapId, Collections.emptyList());
List<Object[]> resultDataContainer = new ArrayList<>();
DataSchema dataSchema = right.getDataSchema();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this loop receive column-based block ?

@@ -244,10 +245,10 @@ public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, Map

// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) {
TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock();
int stageId = stageMetadata.getStageId();
LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, sending error block: {}", requestId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this just use default version of toString() method, producing useless message ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants