Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query thread context and cid #15180

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Mar 4, 2025

Thread context

This PR adds two new classes: QueryThreadContext and MseWorkerThreadContext. As their name implies, these classes are designed to be context attached to the current thread. These classes include some information I consider useful, but we can discuss them if needed.

This opens the ability to change the programming model. We could remove some parameters or attributes from contextual objects. For example, the request id is already present in several contextual objects we usually send to other methods. With this new introduction, these contextual objects could be removed or simplified as long as we assume a thread will only be executing a single query, which seems a natural assumption. In this PR, I tried not to apply that change. Contextual objects are not modified. I did that mainly because I want to keep the code clean but also because it is difficult to be 100% sure that the thread local object is initialized, especially on tests.

As explained in the Javadoc of both QueryThreadContext and MseWorkerThreadContext, these classes are a bit more complex than expected internally to create a nice and especially safe API that simplifies their use. These classes need to be open when the request is received, and then they may be copied into other threads as needed (for example, when the combine operator is parallelized for each segment). The cleanest way to do this is to decorate an executor to copy the thread local context from the parent. That is done in QueryThreadContext.contextAwareExecutorService (and the same method in MseWorkerThreadContext). Given that we do not always control the thread context being used, sometimes we need to explicitly add try-with-resources. This is the case in some of the framework hooks, for example, when Netty or GPRC calls our code.

The new thread context is also being used to set MDC instead of the executor I introduced last week (which I'm removing here). The MDC variables haven't changed.

CID

Another change in the PR is to define a new cid, which could be understood as client id or correlation id. The difference between this id and the request id is explained in javadoc, but the TL;DR: is:

  1. Cid can be optionally set by clients. If they don't, the broker assigned request id is used instead.
  2. Cid doesn't have to be unique. Clients can decide to use the same cid for different queries.
  3. Cid can be any string, while request ids are longs.
  4. Cid is constant for all query execution, while request id is usually set in the broker but changes during the query lifecycle (ie hybrid tables or MSE and leaves).

This cid is sent from brokers to servers in the netty SSE protocol and worker MSE (and GRPC-based) protocol. We still don't have support for setting cid in all the different protocols being used. In order to be able to send the CID to SSE I needed to modify the thrift model to add this attribute. This implies I needed to regenate some files. Given I'm using the latest thrift version, the code of these classes changed a bit (adding @Override or some new methods)

The cid is also declared in MDC as pinot.query.cid (remember request id is registered as pinot.query.id)

requestContext.setBrokerId(_brokerId);
long requestId = _requestIdGenerator.get();
requestContext.setRequestId(requestId);
try (QueryThreadContext.CloseableContext closeMe = QueryThreadContext.open()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that I had to introduce a try-with-resources here, I recommend to review this class with the hide whitespace option in GH

@@ -151,6 +152,7 @@ public void shutDown() {
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
HttpHeaders httpHeaders, AccessControl accessControl) {
QueryThreadContext.setQueryEngine("mse");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One decision I had to make is whether this should be an enum instead, but decided to use Strings to make it easier to add extensible engines in the future

*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.pinot.common.request;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2024-05-01")
@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.19.0)", date = "2025-03-03")
Copy link
Contributor Author

@gortiz gortiz Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to regenerate all these thrift-generated classes because I needed to add the new cid parameter.

Comment on lines -145 to -160
MdcExecutor mdcExecutor = new MdcExecutor(executorService) {
@Override
protected boolean alreadyRegistered() {
return LoggerConstants.QUERY_ID_KEY.isRegistered();
}

@Override
protected void registerInMdc() {
queryRequest.registerInMdc();
}

@Override
protected void unregisterFromMdc() {
queryRequest.unregisterFromMdc();
}
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to set MDC here because we already did it when setting QueryThreadContext.setIds

Comment on lines 125 to 130
return ListenableFutureTask.create(() -> {
try (QueryThreadContext.CloseableContext closeme = QueryThreadContext.open()) {
queryRequest.registerOnQueryThreadLocal();
return processQueryAndSerialize(queryRequest, executorService);
}
});
Copy link
Contributor Author

@gortiz gortiz Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the code that ensures operators SSE have the context initialized

@@ -125,9 +126,10 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
byte[] requestBytes = null;
String tableNameWithType = null;

try {
try (QueryThreadContext.CloseableContext closeme = QueryThreadContext.open()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the Netty thread starts to run SSE queries.

@codecov-commenter
Copy link

codecov-commenter commented Mar 4, 2025

Codecov Report

Attention: Patch coverage is 6.13383% with 505 lines in your changes missing coverage. Please review.

Project coverage is 34.18%. Comparing base (59551e4) to head (56ce277).
Report is 1837 commits behind head on master.

Files with missing lines Patch % Lines
...org/apache/pinot/spi/query/QueryThreadContext.java 5.73% 145 Missing and 3 partials ⚠️
...apache/pinot/query/service/server/QueryServer.java 0.00% 77 Missing ⚠️
...org/apache/pinot/query/MseWorkerThreadContext.java 0.00% 74 Missing ⚠️
...che/pinot/core/transport/grpc/GrpcQueryServer.java 0.00% 53 Missing ⚠️
...roker/requesthandler/BaseBrokerRequestHandler.java 35.48% 33 Missing and 7 partials ⚠️
...va/org/apache/pinot/query/runtime/QueryRunner.java 0.00% 37 Missing ⚠️
...pinot/broker/api/resources/PinotClientRequest.java 0.00% 13 Missing ⚠️
...roker/requesthandler/GrpcBrokerRequestHandler.java 0.00% 8 Missing ⚠️
...e/pinot/query/service/dispatch/DispatchClient.java 0.00% 7 Missing ⚠️
...ot/common/utils/grpc/BrokerGrpcRequestBuilder.java 0.00% 6 Missing ⚠️
... and 13 more

❗ There is a different number of reports uploaded between BASE (59551e4) and HEAD (56ce277). Click for more details.

HEAD has 28 uploads less than BASE
Flag BASE (59551e4) HEAD (56ce277)
integration 7 3
temurin 12 6
java-21 7 4
skip-bytebuffers-true 3 2
skip-bytebuffers-false 7 4
unittests 5 3
unittests1 2 0
java-11 5 2
integration1 2 0
custom-integration1 2 0
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #15180       +/-   ##
=============================================
- Coverage     61.75%   34.18%   -27.57%     
- Complexity      207      684      +477     
=============================================
  Files          2436     2774      +338     
  Lines        133233   156537    +23304     
  Branches      20636    24019     +3383     
=============================================
- Hits          82274    53511    -28763     
- Misses        44911    98742    +53831     
+ Partials       6048     4284     -1764     
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <ø> (-0.01%) ⬇️
integration1 ?
integration2 0.00% <ø> (ø)
java-11 34.15% <6.13%> (-27.56%) ⬇️
java-21 34.13% <6.13%> (-27.50%) ⬇️
skip-bytebuffers-false 34.18% <6.13%> (-27.57%) ⬇️
skip-bytebuffers-true 34.11% <6.13%> (+6.38%) ⬆️
temurin 34.18% <6.13%> (-27.57%) ⬇️
unittests 34.18% <6.13%> (-27.57%) ⬇️
unittests1 ?
unittests2 34.18% <6.13%> (+6.45%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ankitsultana ankitsultana requested a review from vvivekiyer March 4, 2025 23:15
@gortiz gortiz marked this pull request as draft March 5, 2025 07:26
@gortiz gortiz marked this pull request as ready for review March 6, 2025 07:29
if ("strict".equalsIgnoreCase(mode)) {
_strictMode = true;
}
if (mode != null && !mode.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confused by the sequence of lines. Shouldn't the first check be if it is null or empty and then check if it is strict ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is more difficult to read than what you suggested, but I wanted to fail in case something different to null, "" or "strict" is used. I think I can rewrite it to be a bit more readable.

@@ -947,6 +950,7 @@ static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRouti
@Override
protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) {
super.onQueryStart(requestId, clientRequestId, query, extras);
QueryThreadContext.setQueryEngine("sse");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an enum. Can that be used instead ? I know its not appropriately named right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I added a GH discussion explaining why I didn't do that, but it could be hidden after my latest changes in the code.

I though about using the enum, but decided against just because enums are not extensible and we may be adding more engines (maybe as SPI) in the future. This is especially true for TSE, which seems to be following that path. Anyway, I'm open to use an enum right now and change it back to a String or another class in future if we find it problematic

_executorService = ExecutorServiceUtils.create(config, Server.MULTISTAGE_EXECUTOR_CONFIG_PREFIX,
"query-runner-on-" + port, Server.DEFAULT_MULTISTAGE_EXECUTOR_TYPE);

int hardLimit = HardLimitExecutor.getMultiStageExecutorHardLimit(config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where was this block of code moved to ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was deleted by mistake while resolving a conflict during a merge. It is fixed in the last commit

@@ -125,9 +132,9 @@ public void shutdown() {

@Override
public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) {
Map<String, String> requestMetadata;
Map<String, String> reqMetadata;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the name of variable changed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be able to write the try bellow in a single line

    try (QueryThreadContext.CloseableContext queryTlClosable = QueryThreadContext.openFromRequestMetadata(reqMetadata);

@gortiz
Copy link
Contributor Author

gortiz commented Mar 12, 2025

@vrajat can you take another look?

Copy link
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gortiz, this is amazing!

Comment on lines +431 to +437
private long _startTimeMs;
private long _deadlineMs;
private String _brokerId;
private long _requestId;
private String _cid;
private String _sql;
private String _queryEngine;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are most of these not being used currently? Is there a plan to extend the MDC usage to include this information in logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, most of these values are not added to the log context (although we could). I've added them because I think they are useful to debug queries (I mean, using breakpoints) and because future code may be interested in using them for their own purposes. For example, we don't need to spread deadlines or request ids in the code anymore.

Comment on lines 403 to 404
long reqId = Long.parseLong(id);
QueryThreadContext.setIds(reqId, id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The id here could be request ID or client ID right? And if it's a client ID string set by the user, can't this long parsing fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. In fact, the code this morning was different, but it looked a bit strange to only set the ids in one of the branches. I changed the code to make it easier to read. Now the broker handlers have a method to return the request id given a client id and in case it is not found we use a default value instead.

public static class Request {
public static class MetadataKeys {
public static final String REQUEST_ID = "requestId";
public static final String CORRELATION_ID = "correlationId";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, this will be set to the clientQueryId if provided by the user in the query or else it will be the broker generated request ID for the query, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Let me add some extra javadocs

public static class Request {
public static class MetadataKeys {
public static final String REQUEST_ID = "requestId";
public static final String CORRELATION_ID = "correlationId";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I see that one of the places where this key is being read is in QueryThreadContext::openFromRequestMetadata - which is being called from the multi-stage engine's gRPC query server. But the only place I see it being set is in the single-stage engine's gRPC request builder. Am I missing something or should we be setting this key in the request metadata sent from the MSE's query dispatcher to the MSE gRPC query server as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I missed that because it ended up being initialized to the same value. It should also be set in QueryDispatcher. SSE was fine because it is set using the Thrift binaries

Comment on lines +74 to +80
String cid = QueryThreadContext.isInitialized() && QueryThreadContext.getCid() != null
? QueryThreadContext.getCid()
: Long.toString(requestId);
Worker.CancelRequest cancelRequest = Worker.CancelRequest.newBuilder()
.setRequestId(requestId)
.setCid(cid)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is this change required for? I'm not sure I follow it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are doing the following:

  1. Get the cid or build one using the request id in case it is not present in the context. The later is probably not needed, but it makes the code cleaner and safer.
  2. The code that builds the CancelRequest has been changed to be closer to what is expected in fluent APIs, basically a method per line
  3. We also add the Cid as an attribute

The later is needed to be able to set the cid on the QueryThreadContexts of the servers that are going to cancel the query. They don't need the cid to cancel the query, but we want to include that in the cancellation logs for sure.

@@ -77,12 +85,16 @@ public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics serverM
_timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs);
}

/**
* This is called from the grpc server to create a ServerQueryRequest from the grpc request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth clarifying that this is the SSE gRPC server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor above indicates in its javadoc that it is the one used by MSE. Anyway I'm adding more javadoc to make it easier to understand.

Comment on lines +99 to +100
ExecutorService innerExecutorService = QueryThreadContext.contextAwareExecutorService(queryExecutorService);
ListenableFutureTask<byte[]> queryTask = createQueryFutureTask(queryRequest, innerExecutorService);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the context aware wrapper directly in the resource manager instead of wrapping it at all the call sites? Or is there a particular reason you chose to do it this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that you're also explicitly adding the context opening to the query's ListenableFutureTask so I'm guessing there's a particular reason for doing it this way that I'm missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a very good question. That part of the code is really complex to read. In fact, I had to debug it to remember why I was doing that.

The key point here is that there are 2 executors here:

  • The one behind ResourceManager, which runs the future we just created.
  • innerExecutorService, which can be used inside our future to parallelize query processing (ie a group by combine)

The one in ResourceManager is not a context aware executor and in fact ResourceManager is being used as a SPI to provide some executor features. We could delegate the responsibility of keeping the thread context into ResourceManager, but that would break current semantics and third party ResourceManager would be broken. Therefore I decided to manage the context in the ListenableFuture itself.

Once the ListenableFuture has the context set, if it uses innerExecutorService to parallelize something, it will be able to get the context from the thread running the ListenableFuture.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants