Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1490][CIP-6] Introduce tier producer in celeborn flink client #2733

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.celeborn.common.exception.DriverChangedException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
Expand Down Expand Up @@ -207,13 +208,13 @@ FlinkShuffleClientImpl getShuffleClient() {
}

/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, int subIdx) {
public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
try {
flinkShuffleClient.pushDataToLocation(
shuffleId,
mapId,
attemptId,
subIdx,
bufferHeader.getSubPartitionId(),
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
partitionLocation,
() -> byteBuf.release());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public BufferHeader(Buffer.DataType dataType, boolean isCompressed, int size) {
this(0, 0, 0, size + 2, dataType, isCompressed, size);
}

public BufferHeader(
int subPartitionId, Buffer.DataType dataType, boolean isCompressed, int size) {
this(subPartitionId, 0, 0, size + 2, dataType, isCompressed, size);
}

public BufferHeader(
int subPartitionId,
int attemptId,
Expand All @@ -54,6 +59,10 @@ public BufferHeader(
this.size = size;
}

public int getSubPartitionId() {
return subPartitionId;
}

public Buffer.DataType getDataType() {
return dataType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,31 @@
import org.apache.celeborn.plugin.flink.utils.Utils;
import org.apache.celeborn.reflect.DynMethods;

/** Harness used to pack multiple partial buffers together as a full one. */
/**
* Harness used to pack multiple partial buffers together as a full one. There are two Flink
* integration strategies: Remote Shuffle Service and Hybrid Shuffle. In Remote Shuffle Service
* integration strategy, the {@link BufferPacker} will receive buffers containing both shuffle data
* and the Celeborn header. In Hybrid Shuffle integration strategy employs the subclass {@link
* ReceivedNoHeaderBufferPacker}, which receives buffers containing only shuffle data. In these two
* integration strategies, the BufferPacker must utilize different methods to pack buffers, and the
* result of the packed buffer should be same.
*/
public class BufferPacker {
private static Logger logger = LoggerFactory.getLogger(BufferPacker.class);

public interface BiConsumerWithException<T, U, E extends Throwable> {
void accept(T var1, U var2) throws E;
}

private final BiConsumerWithException<ByteBuf, Integer, InterruptedException> ripeBufferHandler;
protected final BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException>
ripeBufferHandler;

private Buffer cachedBuffer;
protected Buffer cachedBuffer;

private int currentSubIdx = -1;
protected int currentSubIdx = -1;

public BufferPacker(
BiConsumerWithException<ByteBuf, Integer, InterruptedException> ripeBufferHandler) {
BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException> ripeBufferHandler) {
this.ripeBufferHandler = ripeBufferHandler;
}

Expand All @@ -71,7 +80,8 @@ public void process(Buffer buffer, int subIdx) throws InterruptedException {
int targetSubIdx = currentSubIdx;
currentSubIdx = subIdx;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());
handleRipeBuffer(dumpedBuffer, targetSubIdx);
handleRipeBuffer(
dumpedBuffer, targetSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
} else {
/**
* this is an optimization. if cachedBuffer can contain other buffer, then other buffer can
Expand All @@ -95,12 +105,13 @@ public void process(Buffer buffer, int subIdx) throws InterruptedException {
cachedBuffer = buffer;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());

handleRipeBuffer(dumpedBuffer, currentSubIdx);
handleRipeBuffer(
dumpedBuffer, currentSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
}
}
}

private void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length) {
protected void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length) {
logger.debug(
"isDrain:{}, cachedBuffer pack partition:{} type:{}, length:{}",
isDrain,
Expand All @@ -112,15 +123,27 @@ private void logBufferPack(boolean isDrain, Buffer.DataType dataType, int length
public void drain() throws InterruptedException {
if (cachedBuffer != null) {
logBufferPack(true, cachedBuffer.getDataType(), cachedBuffer.readableBytes());
handleRipeBuffer(cachedBuffer, currentSubIdx);
handleRipeBuffer(
cachedBuffer, currentSubIdx, cachedBuffer.getDataType(), cachedBuffer.isCompressed());
}
cachedBuffer = null;
currentSubIdx = -1;
}

private void handleRipeBuffer(Buffer buffer, int subIdx) throws InterruptedException {
protected void handleRipeBuffer(
Buffer buffer, int subIdx, Buffer.DataType dataType, boolean isCompressed)
throws InterruptedException {
// Always set the compress flag to false, because the result buffer generated by {@link
// BufferPacker} needs to be split into multiple buffers in unpack process,
// If the compress flag is set to true for this result buffer, it will throw an exception during
// the unpack process, as compressed buffer cannot be sliced.
buffer.setCompressed(false);
ripeBufferHandler.accept(buffer.asByteBuf(), subIdx);
ripeBufferHandler.accept(
buffer.asByteBuf(), new BufferHeader(subIdx, dataType, isCompressed, buffer.getSize()));
}

public boolean isEmpty() {
return cachedBuffer == null;
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.buffer;

import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;

import org.apache.celeborn.plugin.flink.utils.BufferUtils;

/**
* Harness used to pack multiple partial buffers together as a full one. It used in Flink hybrid
* shuffle integration strategy now.
*/
public class ReceivedNoHeaderBufferPacker extends BufferPacker {

/** The flink buffer header of cached first buffer. */
private BufferHeader firstBufferHeader;

public ReceivedNoHeaderBufferPacker(
BiConsumerWithException<ByteBuf, BufferHeader, InterruptedException> ripeBufferHandler) {
super(ripeBufferHandler);
}

@Override
public void process(Buffer buffer, int subIdx) throws InterruptedException {
if (buffer == null) {
return;
}

if (buffer.readableBytes() == 0) {
buffer.recycleBuffer();
return;
}

if (cachedBuffer == null) {
// cache the first buffer and record flink buffer header of first buffer
cachedBuffer = buffer;
currentSubIdx = subIdx;
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
} else if (currentSubIdx != subIdx) {
// drain the previous cached buffer and cache current buffer
Buffer dumpedBuffer = cachedBuffer;
cachedBuffer = buffer;
int targetSubIdx = currentSubIdx;
currentSubIdx = subIdx;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());
handleRipeBuffer(
dumpedBuffer, targetSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
} else {
int bufferHeaderLength = BufferUtils.HEADER_LENGTH - BufferUtils.HEADER_LENGTH_PREFIX;
if (cachedBuffer.readableBytes() + buffer.readableBytes() + bufferHeaderLength
<= cachedBuffer.getMaxCapacity() - BufferUtils.HEADER_LENGTH) {
// if the cache buffer can contain the current buffer, then pack the current buffer into
// cache buffer
ByteBuf byteBuf = cachedBuffer.asByteBuf();
byteBuf.writeByte(buffer.getDataType().ordinal());
byteBuf.writeBoolean(buffer.isCompressed());
byteBuf.writeInt(buffer.getSize());
byteBuf.writeBytes(buffer.asByteBuf(), 0, buffer.readableBytes());
logBufferPack(false, buffer.getDataType(), buffer.readableBytes() + bufferHeaderLength);

buffer.recycleBuffer();
} else {
// if the cache buffer cannot contain the current buffer, drain the cached buffer, and cache
// the current buffer
Buffer dumpedBuffer = cachedBuffer;
cachedBuffer = buffer;
logBufferPack(false, dumpedBuffer.getDataType(), dumpedBuffer.readableBytes());

handleRipeBuffer(
dumpedBuffer, currentSubIdx, dumpedBuffer.getDataType(), dumpedBuffer.isCompressed());
firstBufferHeader =
new BufferHeader(subIdx, buffer.getDataType(), buffer.isCompressed(), buffer.getSize());
}
}
}

@Override
protected void handleRipeBuffer(
Buffer buffer, int subIdx, Buffer.DataType dataType, boolean isCompressed)
throws InterruptedException {
if (buffer == null || buffer.readableBytes() == 0) {
return;
}
// Always set the compress flag to false, because this buffer contains Celeborn header and
// multiple flink data buffers.
// It is crucial to keep this flag set to false because we need to slice this buffer to extract
// flink data buffers
// during the unpacking process, the flink {@link NetworkBuffer} cannot correctly slice
// compressed buffer.
buffer.setCompressed(false);
ripeBufferHandler.accept(buffer.asByteBuf(), firstBufferHeader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ public static void setCompressedDataWithHeader(Buffer buffer, Buffer compressedB
buffer.setSize(dataLength + HEADER_LENGTH);
}

/**
* It is utilized in Hybrid Shuffle integration strategy, in this case the buffer containing data
* only. Copies the data of the compressed buffer to the origin buffer.
*/
public static void setCompressedDataWithoutHeader(Buffer buffer, Buffer compressedBuffer) {
checkArgument(buffer != null, "Must be not null.");
checkArgument(buffer.getReaderIndex() == 0, "Illegal reader index.");

boolean isCompressed = compressedBuffer != null && compressedBuffer.isCompressed();
int dataLength = isCompressed ? compressedBuffer.readableBytes() : buffer.readableBytes();
ByteBuf byteBuf = buffer.asByteBuf();
if (isCompressed) {
byteBuf.writerIndex(0);
byteBuf.writeBytes(compressedBuffer.asByteBuf());
// set the compression flag here, as we need it when writing the sub-header of this buffer
buffer.setCompressed(true);
}
buffer.setSize(dataLength);
}

public static void setBufferHeader(
ByteBuf byteBuf, Buffer.DataType dataType, boolean isCompressed, int dataLength) {
byteBuf.writerIndex(0);
Expand Down
Loading
Loading