Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup authored and reswqa committed Aug 14, 2024
1 parent 7d0cd40 commit a9d137d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
public class FlinkShuffleClientImpl extends ShuffleClientImpl {
public static final Logger logger = LoggerFactory.getLogger(FlinkShuffleClientImpl.class);
/** The default buffer size bytes in flink is 32KB. */
private static int DEFAULT_BUFFER_SIZE_BYTES = 32 * 1024;
public static int DEFAULT_BUFFER_SIZE_BYTES = 32 * 1024;

private static volatile FlinkShuffleClientImpl _instance;
private static volatile boolean initialized = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ public void setup() throws IOException, InterruptedException {
conf = new CelebornConf();
shuffleClient =
new FlinkShuffleClientImpl(
"APP", "localhost", 1232, System.currentTimeMillis(), conf, null) {
"APP",
"localhost",
1232,
System.currentTimeMillis(),
conf,
null,
FlinkShuffleClientImpl.DEFAULT_BUFFER_SIZE_BYTES) {
@Override
public void setupLifecycleManagerRef(String host, int port) {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit
0,
System.currentTimeMillis(),
clientConf,
new UserIdentifier("1", "1")) {
new UserIdentifier("1", "1"),
FlinkShuffleClientImpl.DEFAULT_BUFFER_SIZE_BYTES) {
override def setupLifecycleManagerRef(host: String, port: Int): Unit = {}
}
testHeartbeatFromWorker2Client(flinkShuffleClientImpl.getDataClientFactory)
Expand All @@ -52,7 +53,8 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit
0,
System.currentTimeMillis(),
clientConf,
new UserIdentifier("1", "1")) {
new UserIdentifier("1", "1"),
FlinkShuffleClientImpl.DEFAULT_BUFFER_SIZE_BYTES) {
override def setupLifecycleManagerRef(host: String, port: Int): Unit = {}
}
testHeartbeatFromWorker2ClientWithNoHeartbeat(flinkShuffleClientImpl.getDataClientFactory)
Expand All @@ -67,7 +69,8 @@ class HeartbeatTest extends AnyFunSuite with Logging with MiniClusterFeature wit
0,
System.currentTimeMillis(),
clientConf,
new UserIdentifier("1", "1")) {
new UserIdentifier("1", "1"),
FlinkShuffleClientImpl.DEFAULT_BUFFER_SIZE_BYTES) {
override def setupLifecycleManagerRef(host: String, port: Int): Unit = {}
}
testHeartbeatFromWorker2ClientWithCloseChannel(flinkShuffleClientImpl.getDataClientFactory)
Expand Down

0 comments on commit a9d137d

Please sign in to comment.