Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravind Patnam committed Aug 13, 2024
1 parent 35e3187 commit 5b19353
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,10 @@ import org.apache.celeborn.common.util.{CollectionUtils => localCollectionUtils}

object PbSerDeUtils {

private var masterPersistWorkerNetworkLocation: Option[Boolean] = None

def setMasterPersistWorkerNetworkLocation(value: Boolean): Unit = {
masterPersistWorkerNetworkLocation match {
case None => masterPersistWorkerNetworkLocation = Some(value)
case Some(_) =>
// this should never happen, but being defensive
throw new IllegalStateException(
s"masterPersistWorkerNetworkLocation has already been set once to" +
s" ${masterPersistWorkerNetworkLocation.get}")
}
private var masterPersistWorkerNetworkLocation: Boolean = false

def setMasterPersistWorkerNetworkLocation(masterPersistWorkerNetworkLocation: Boolean) = {
this.masterPersistWorkerNetworkLocation = masterPersistWorkerNetworkLocation
}

@throws[InvalidProtocolBufferException]
Expand Down Expand Up @@ -249,7 +242,7 @@ object PbSerDeUtils {
pbWorkerInfo.getInternalPort,
disks,
userResourceConsumption)
if (masterPersistWorkerNetworkLocation.getOrElse(false)) {
if (masterPersistWorkerNetworkLocation) {
workerInfo.networkLocation_$eq(pbWorkerInfo.getNetworkLocation)
}
workerInfo
Expand All @@ -266,7 +259,7 @@ object PbSerDeUtils {
.setPushPort(workerInfo.pushPort)
.setReplicatePort(workerInfo.replicatePort)
.setInternalPort(workerInfo.internalPort)
if (masterPersistWorkerNetworkLocation.getOrElse(false)) {
if (masterPersistWorkerNetworkLocation) {
builder.setNetworkLocation(workerInfo.networkLocation)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,19 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
}

test("fromAndToPbWorkerInfo") {
PbSerDeUtils.setMasterPersistWorkerNetworkLocation(true)
val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
val pbWorkerInfoWithEmptyResource = PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
val restoredWorkerInfoWithEmptyResource =
PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)

assert(restoredWorkerInfo.equals(workerInfo1))
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new util.HashMap[
UserIdentifier,
ResourceConsumption]()))
Seq(false, true).foreach { b =>
PbSerDeUtils.setMasterPersistWorkerNetworkLocation(b)
val pbWorkerInfo = PbSerDeUtils.toPbWorkerInfo(workerInfo1, false, false)
val pbWorkerInfoWithEmptyResource = PbSerDeUtils.toPbWorkerInfo(workerInfo1, true, false)
val restoredWorkerInfo = PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfo)
val restoredWorkerInfoWithEmptyResource =
PbSerDeUtils.fromPbWorkerInfo(pbWorkerInfoWithEmptyResource)

assert(restoredWorkerInfo.equals(workerInfo1))
assert(restoredWorkerInfoWithEmptyResource.userResourceConsumption.equals(new util.HashMap[
UserIdentifier,
ResourceConsumption]()))
}
}

test("fromAndToPbPartitionLocation") {
Expand Down

0 comments on commit 5b19353

Please sign in to comment.