-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update the Supervisor endpoint to not restart the Supervisor if the spec was unmodified #17707
base: master
Are you sure you want to change the base?
Changes from all commits
e2800b3
2bb4603
a8db3de
29dbd28
22d6a94
3463634
fe3954e
34c5d80
47a0144
360310b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -19,12 +19,15 @@ | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
package org.apache.druid.indexing.overlord.supervisor; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import com.fasterxml.jackson.core.JsonProcessingException; | ||||||||||||||||||||||||||||
import com.fasterxml.jackson.databind.ObjectMapper; | ||||||||||||||||||||||||||||
import com.google.common.base.Optional; | ||||||||||||||||||||||||||||
import com.google.common.base.Preconditions; | ||||||||||||||||||||||||||||
import com.google.common.util.concurrent.ListenableFuture; | ||||||||||||||||||||||||||||
import com.google.inject.Inject; | ||||||||||||||||||||||||||||
import org.apache.druid.common.guava.FutureUtils; | ||||||||||||||||||||||||||||
import org.apache.druid.error.DruidException; | ||||||||||||||||||||||||||||
import org.apache.druid.guice.annotations.Json; | ||||||||||||||||||||||||||||
import org.apache.druid.indexing.common.TaskLockType; | ||||||||||||||||||||||||||||
import org.apache.druid.indexing.common.task.Tasks; | ||||||||||||||||||||||||||||
import org.apache.druid.indexing.overlord.DataSourceMetadata; | ||||||||||||||||||||||||||||
|
@@ -42,6 +45,7 @@ | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import javax.annotation.Nullable; | ||||||||||||||||||||||||||||
import java.util.ArrayList; | ||||||||||||||||||||||||||||
import java.util.Arrays; | ||||||||||||||||||||||||||||
import java.util.List; | ||||||||||||||||||||||||||||
import java.util.Map; | ||||||||||||||||||||||||||||
import java.util.Set; | ||||||||||||||||||||||||||||
|
@@ -62,10 +66,12 @@ public class SupervisorManager | |||||||||||||||||||||||||||
private final Object lock = new Object(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
private volatile boolean started = false; | ||||||||||||||||||||||||||||
private final ObjectMapper jsonMapper; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
@Inject | ||||||||||||||||||||||||||||
public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager) | ||||||||||||||||||||||||||||
public SupervisorManager(@Json ObjectMapper jsonMapper, MetadataSupervisorManager metadataSupervisorManager) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
this.jsonMapper = jsonMapper; | ||||||||||||||||||||||||||||
this.metadataSupervisorManager = metadataSupervisorManager; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
@@ -166,6 +172,36 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) | |||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||
* Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list. | ||||||||||||||||||||||||||||
* This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted | ||||||||||||||||||||||||||||
* @param spec The spec submitted | ||||||||||||||||||||||||||||
* @return boolean - false if the spec is unchanged, else true | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||
public boolean shouldUpdateSupervisor(SupervisorSpec spec) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the suggestion from @AmatyaAvadhanula , you can also update the /**
* Creates or updates a supervisor and then starts it.
* If no change has been made to the supervisor spec, it is only restarted.
*
* @return true if the supervisor was updated, false otherwise
*/
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(spec, "spec");
Preconditions.checkNotNull(spec.getId(), "spec.getId()");
Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
// Persist a new version of the spec only if it has been updated
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
return shouldUpdateSpec;
}
} |
||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
Preconditions.checkState(started, "SupervisorManager not started"); | ||||||||||||||||||||||||||||
Preconditions.checkNotNull(spec, "spec"); | ||||||||||||||||||||||||||||
Preconditions.checkNotNull(spec.getId(), "spec.getId()"); | ||||||||||||||||||||||||||||
Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()"); | ||||||||||||||||||||||||||||
synchronized (lock) { | ||||||||||||||||||||||||||||
Preconditions.checkState(started, "SupervisorManager not started"); | ||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||
byte[] specAsBytes = jsonMapper.writeValueAsBytes(spec); | ||||||||||||||||||||||||||||
Pair<Supervisor, SupervisorSpec> currentSupervisor = supervisors.get(spec.getId()); | ||||||||||||||||||||||||||||
if (currentSupervisor != null && | ||||||||||||||||||||||||||||
Arrays.equals(specAsBytes, jsonMapper.writeValueAsBytes(currentSupervisor.rhs)) | ||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
Comment on lines
+192
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you may simplify this as follows:
Suggested change
|
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
catch (JsonProcessingException ex) { | ||||||||||||||||||||||||||||
log.warn("Failed to write spec as bytes for spec_id[%s]", spec.getId()); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
public boolean stopAndRemoveSupervisor(String id) | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
Preconditions.checkState(started, "SupervisorManager not started"); | ||||||||||||||||||||||||||||
|
@@ -363,8 +399,12 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( | |||||||||||||||||||||||||||
return true; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
catch (Exception e) { | ||||||||||||||||||||||||||||
log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].", | ||||||||||||||||||||||||||||
upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); | ||||||||||||||||||||||||||||
log.error(e, | ||||||||||||||||||||||||||||
"Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].", | ||||||||||||||||||||||||||||
upgradedPendingSegment.getUpgradedFromSegmentId(), | ||||||||||||||||||||||||||||
upgradedPendingSegment.getId().getVersion(), | ||||||||||||||||||||||||||||
supervisorId | ||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||
Comment on lines
+402
to
+407
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return false; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,7 +119,11 @@ public SupervisorResource( | |
@POST | ||
@Consumes(MediaType.APPLICATION_JSON) | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) | ||
public Response specPost( | ||
final SupervisorSpec spec, | ||
@Context final HttpServletRequest req, | ||
@QueryParam("skipRestartIfUnmodified") boolean skipRestartIfUnmodified | ||
Comment on lines
+124
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style: Make the request the last argument of this method.
Comment on lines
+124
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a boxed |
||
) | ||
{ | ||
return asLeaderWithSupervisorManager( | ||
manager -> { | ||
|
@@ -151,6 +155,9 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe | |
if (!authResult.allowAccessWithNoRestriction()) { | ||
throw new ForbiddenException(authResult.getErrorMessage()); | ||
} | ||
if (skipRestartIfUnmodified && !manager.shouldUpdateSupervisor(spec)) { | ||
return Response.ok(ImmutableMap.of("id", spec.getId())).build(); | ||
} | ||
|
||
manager.createOrUpdateAndStartSupervisor(spec); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.apache.druid.indexing.overlord.supervisor; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.base.Optional; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.ImmutableMap; | ||
|
@@ -33,6 +34,7 @@ | |
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; | ||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; | ||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; | ||
import org.apache.druid.jackson.DefaultObjectMapper; | ||
import org.apache.druid.java.util.common.DateTimes; | ||
import org.apache.druid.java.util.common.Intervals; | ||
import org.apache.druid.metadata.MetadataSupervisorManager; | ||
|
@@ -60,6 +62,7 @@ | |
@RunWith(EasyMockRunner.class) | ||
public class SupervisorManagerTest extends EasyMockSupport | ||
{ | ||
private static final ObjectMapper MAPPER = new DefaultObjectMapper(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add an empty line after this |
||
@Mock | ||
private MetadataSupervisorManager metadataSupervisorManager; | ||
|
||
|
@@ -80,7 +83,7 @@ public class SupervisorManagerTest extends EasyMockSupport | |
@Before | ||
public void setUp() | ||
{ | ||
manager = new SupervisorManager(metadataSupervisorManager); | ||
manager = new SupervisorManager(MAPPER, metadataSupervisorManager); | ||
} | ||
|
||
@Test | ||
|
@@ -175,6 +178,21 @@ public void testCreateOrUpdateAndStartSupervisorNullSpecId() | |
verifyAll(); | ||
} | ||
|
||
@Test | ||
public void testShouldUpdateSupervisor() | ||
{ | ||
SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1); | ||
SupervisorSpec spec2 = new TestSupervisorSpec("id2", supervisor2); | ||
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of( | ||
"id1", spec | ||
); | ||
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); | ||
supervisor1.start(); | ||
replayAll(); | ||
manager.start(); | ||
Assert.assertFalse(manager.shouldUpdateSupervisor(spec)); | ||
Assert.assertTrue(manager.shouldUpdateSupervisor(spec2)); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add an empty line after this. |
||
@Test | ||
public void testStopAndRemoveSupervisorNotStarted() | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.