Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1601] Support revise lost shuffles #2746

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

FMX
Copy link
Contributor

@FMX FMX commented Sep 19, 2024

What changes were proposed in this pull request?

To support revising lost shuffle IDs in a long-running job such as flink batch jobs.

Why are the changes needed?

  1. To support revise lost shuffles.
  2. To add an HTTP endpoint to revise lost shuffles manually.

Does this PR introduce any user-facing change?

NO.

How was this patch tested?

Cluster tests.

@cxzl25 cxzl25 changed the title [CELEBORN-1600] Support revise lost shuffles [CELEBORN-1601] Support revise lost shuffles Sep 19, 2024
@SteNicholas
Copy link
Member

@FMX, could you also support the corresponding cli command for the HTTP endpoint to revise lost shuffles?

@FMX
Copy link
Contributor Author

FMX commented Sep 20, 2024

@FMX, could you also support the corresponding cli command for the HTTP endpoint to revise lost shuffles?

Sounds good. I'll add the cli command.

@SteNicholas
Copy link
Member

@FMX, BTW, the HTTP endpoint should introduce the client api to invoke, which could follow README.md to add.

@FMX
Copy link
Contributor Author

FMX commented Sep 23, 2024

@SteNicholas Thanks. I have added the CLI command and the API endpoint. Please review this PR when you have time.

if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty)
if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty) {
Copy link
Member

@SteNicholas SteNicholas Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this?

@@ -107,6 +107,9 @@ enum MessageType {
REPORT_BARRIER_STAGE_ATTEMPT_FAILURE_RESPONSE = 84;
SEGMENT_START = 85;
NOTIFY_REQUIRED_SEGMENT = 86;

REVISE_LOST_SHUFFLES = 202;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the number start from 202?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep compatible with our internal version.

@@ -433,6 +436,7 @@ message PbHeartbeatFromApplicationResponse {
repeated PbWorkerInfo excludedWorkers = 2;
repeated PbWorkerInfo unknownWorkers = 3;
repeated PbWorkerInfo shuttingWorkers = 4;
repeated int32 registeredShuffles = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
repeated int32 registeredShuffles = 6;
repeated int32 registeredShuffles = 5;

@@ -73,6 +75,7 @@ message ResourceRequest {
optional WorkerEventRequest workerEventRequest = 22;
optional ApplicationMetaRequest applicationMetaRequest = 23;
optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24;
optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the number 102?

@@ -1069,13 +1096,18 @@ private[celeborn] class Master(
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
var appRelatedShuffles = statusSystem.registeredAppAndShuffles.get(appId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var appRelatedShuffles = statusSystem.registeredAppAndShuffles.get(appId)
val appRelatedShuffles = statusSystem.registeredAppAndShuffles.getOrDefault(appId, Collecitons.emptySet())

@FMX FMX force-pushed the b1600 branch 2 times, most recently from e71ac9e to 1daa172 Compare September 23, 2024 13:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants