Skip to content

Commit

Permalink
[CELEBORN-1490][CIP-6] Introduce tier producer in celeborn flink client
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Sep 14, 2024
1 parent 1053129 commit 816a268
Show file tree
Hide file tree
Showing 8 changed files with 843 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.celeborn.common.exception.DriverChangedException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
Expand Down Expand Up @@ -207,13 +208,13 @@ FlinkShuffleClientImpl getShuffleClient() {
}

/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, int subIdx) {
public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
try {
flinkShuffleClient.pushDataToLocation(
shuffleId,
mapId,
attemptId,
subIdx,
bufferHeader.getSubPartitionId(),
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
partitionLocation,
() -> byteBuf.release());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public BufferHeader(Buffer.DataType dataType, boolean isCompressed, int size) {
this(0, 0, 0, size + 2, dataType, isCompressed, size);
}

public BufferHeader(
int subPartitionId, Buffer.DataType dataType, boolean isCompressed, int size) {
this(subPartitionId, 0, 0, size + 2, dataType, isCompressed, size);
}

public BufferHeader(
int subPartitionId,
int attemptId,
Expand All @@ -54,6 +59,10 @@ public BufferHeader(
this.size = size;
}

public int getSubPartitionId() {
return subPartitionId;
}

public Buffer.DataType getDataType() {
return dataType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,31 @@
import org.apache.celeborn.plugin.flink.utils.Utils;
import org.apache.celeborn.reflect.DynMethods;

/** Harness used to pack multiple partial buffers together as a full one. */
/**
* Harness used to pack multiple partial buffers together as a full one. There are two Flink
* integration strategies: Remote Shuffle Service and Hybrid Shuffle. In Remote Shuffle Service
* integration strategy, the {@link BufferPacker} will receive buffers containing both shuffle data
* and the Celeborn header. In Hybrid Shuffle integration strategy employs the subclass {@link
* ReceivedNoHeaderBufferPacker}, which receives buffers containing only shuffle data. In these two
* integration strategies, the BufferPacker must utilize different methods to pack buffers, and the
* result of the packed buffer should be same.
*/
public class BufferPacker {
private static Logger logger = LoggerFactory.getLogger(BufferPacker.class);

public interface BiConsumerWithException<T, U, E extends Throwable> {
void accept(T var1, U var2) throws E;
}

private final BiConsumerWithException<ByteBuf, Integer, InterruptedException> ripeBufferHandler;
protected final BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException>
ripeBufferHandler;

private Buffer cachedBuffer;
protected Buffer cachedBuffer;

private int currentSubIdx = -1;
protected int currentSubIdx = -1;

public BufferPacker(
BiConsumerWithException<ByteBuf, Integer, InterruptedException> ripeBufferHandler) {
BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException> ripeBufferHandler) {
this.ripeBufferHandler = ripeBufferHandler;
}

Expand All @@ -71,7 +80,8 @@ public void process(Buffer buffer, int subIdx) throws InterruptedException {
int targetSubIdx = currentSubIdx;
currentSubIdx = subIdx;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());
handleRipeBuffer(dumpedBuffer, targetSubIdx);
handleRipeBuffer(
dumpedBuffer, targetSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
} else {
/**
* this is an optimization. if cachedBuffer can contain other buffer, then other buffer can
Expand All @@ -95,12 +105,13 @@ public void process(Buffer buffer, int subIdx) throws InterruptedException {
cachedBuffer = buffer;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());

handleRipeBuffer(dumpedBuffer, currentSubIdx);
handleRipeBuffer(
dumpedBuffer, currentSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
}
}
}

private void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length) {
protected void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length) {
logger.debug(
"isDrain:{}, cachedBuffer pack partition:{} type:{}, length:{}",
isDrain,
Expand All @@ -112,15 +123,27 @@ private void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length
public void drain() throws InterruptedException {
if (cachedBuffer != null) {
logBufferPack(true, cachedBuffer.getDataType(), cachedBuffer.readableBytes());
handleRipeBuffer(cachedBuffer, currentSubIdx);
handleRipeBuffer(
cachedBuffer, currentSubIdx, cachedBuffer.getDataType(), cachedBuffer.isCompressed());
}
cachedBuffer = null;
currentSubIdx = -1;
}

private void handleRipeBuffer(Buffer buffer, int subIdx) throws InterruptedException {
protected void handleRipeBuffer(
Buffer buffer, int subIdx, Buffer.DataType dataType, boolean isCompressed)
throws InterruptedException {
// Always set the compress flag to false, because the result buffer generated by {@link
// BufferPacker} needs to be split into multiple buffers in unpack process,
// If the compress flag is set to true for this result buffer, it will throw an exception during
// the unpack process, as compressed buffer cannot be sliced.
buffer.setCompressed(false);
ripeBufferHandler.accept(buffer.asByteBuf(), subIdx);
ripeBufferHandler.accept(
buffer.asByteBuf(), new BufferHeader(subIdx, dataType, isCompressed, buffer.getSize()));
}

public boolean isEmpty() {
return cachedBuffer == null;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.buffer;

import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import org.apache.celeborn.plugin.flink.utils.BufferUtils;

/**
* Harness used to pack multiple partial buffers together as a full one. It used in Flink hybrid
* shuffle integration strategy now.
*/
public class ReceivedNoHeaderBufferPacker extends BufferPacker {

/** The flink buffer header of cached first buffer. */
private BufferHeader firstBufferHeader;

public ReceivedNoHeaderBufferPacker(
BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException> ripeBufferHandler) {
super(ripeBufferHandler);
}

@Override
public void process(Buffer buffer, int subIdx) throws InterruptedException {
if (buffer == null) {
return;
}

if (buffer.readableBytes() == 0) {
buffer.recycleBuffer();
return;
}

if (cachedBuffer == null) {
// cache the first buffer and record flink buffer header of first buffer
cachedBuffer = buffer;
currentSubIdx = subIdx;
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
} else if (currentSubIdx != subIdx) {
// drain the previous cached buffer and cache current buffer
Buffer dumpedBuffer = cachedBuffer;
cachedBuffer = buffer;
int targetSubIdx = currentSubIdx;
currentSubIdx = subIdx;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());
handleRipeBuffer(
dumpedBuffer, targetSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
} else {
int bufferHeaderLength = BufferUtils.HEADER_LENGTH - BufferUtils.HEADER_LENGTH_PREFIX;
if (cachedBuffer.readableBytes() + buffer.readableBytes() + bufferHeaderLength
<= cachedBuffer.getMaxCapacity() - BufferUtils.HEADER_LENGTH) {
// if the cache buffer can contain the current buffer, then pack the current buffer into
// cache buffer
ByteBuf byteBuf = cachedBuffer.asByteBuf();
byteBuf.writeByte(buffer.getDataType().ordinal());
byteBuf.writeBoolean(buffer.isCompressed());
byteBuf.writeInt(buffer.getSize());
byteBuf.writeBytes(buffer.asByteBuf(), 0, buffer.readableBytes());
logBufferPack(false, buffer.getDataType(), buffer.readableBytes() + bufferHeaderLength);

buffer.recycleBuffer();
} else {
// if the cache buffer cannot contain the current buffer, drain the cached buffer, and cache
// the current buffer
Buffer dumpedBuffer = cachedBuffer;
cachedBuffer = buffer;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());

handleRipeBuffer(
dumpedBuffer, currentSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
}
}
}

@Override
protected void handleRipeBuffer(
Buffer buffer, int subIdx, Buffer.DataType dataType, boolean isCompressed)
throws InterruptedException {
if (buffer == null || buffer.readableBytes() == 0) {
return;
}
// Always set the compress flag to false, because this buffer contains Celeborn header and
// multiple flink data buffers.
// It is crucial to keep this flag set to false because we need to slice this buffer to extract
// flink data buffers
// during the unpacking process, the flink {@link NetworkBuffer} cannot correctly slice
// compressed buffer.
buffer.setCompressed(false);
ripeBufferHandler.accept(buffer.asByteBuf(), firstBufferHeader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ public static void setCompressedDataWithHeader(Buffer buffer, Buffer compressedB
buffer.setSize(dataLength + HEADER_LENGTH);
}

/**
* It is utilized in Hybrid Shuffle integration strategy, in this case the buffer containing data
* only. Copies the data of the compressed buffer to the origin buffer.
*/
public static void setCompressedDataWithoutHeader(Buffer buffer, Buffer compressedBuffer) {
checkArgument(buffer != null, "Must be not null.");
checkArgument(buffer.getReaderIndex() == 0, "Illegal reader index.");

boolean isCompressed = compressedBuffer != null && compressedBuffer.isCompressed();
int dataLength = isCompressed ? compressedBuffer.readableBytes() : buffer.readableBytes();
ByteBuf byteBuf = buffer.asByteBuf();
if (isCompressed) {
byteBuf.writerIndex(0);
byteBuf.writeBytes(compressedBuffer.asByteBuf());
// set the compression flag here, as we need it when writing the sub-header of this buffer
buffer.setCompressed(true);
}
buffer.setSize(dataLength);
}

public static void setBufferHeader(
ByteBuf byteBuf, Buffer.DataType dataType, boolean isCompressed, int dataLength) {
byteBuf.writerIndex(0);
Expand Down
Loading

0 comments on commit 816a268

Please sign in to comment.