Skip to content

Commit

Permalink
Using socket_posix to replace socket_starboard
Browse files Browse the repository at this point in the history
Using base/starboard implementation to work with net/tcp&udp.

Tested cobalt on Linux-x64 and evergreen-x64.

Todo:
  Win32/XB1/PS5/Apple

b/302741384
  • Loading branch information
maxz-lab committed Aug 1, 2024
1 parent e25e899 commit aaf0f38
Show file tree
Hide file tree
Showing 52 changed files with 570 additions and 169 deletions.
1 change: 1 addition & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,7 @@ component("base") {
"files/file_path_watcher_stub.cc",
"files/file_starboard.cc",
"files/file_util_starboard.cc",
# "files/file_util_posix.cc",
"files/memory_mapped_file_starboard.cc",
"memory/page_size_starboard.cc",
"memory/platform_shared_memory_mapper_starboard.cc",
Expand Down
8 changes: 4 additions & 4 deletions base/files/file_descriptor_watcher_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class FileDescriptorWatcher::Controller::Watcher
friend class FileDescriptorWatcher;

// MessagePumpForIO::FdWatcher:
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;
void OnSocketReadyToRead(int fd) override;
void OnSocketReadyToWrite(int fd) override;

// CurrentThread::DestructionObserver:
void WillDestroyCurrentMessageLoop() override;
Expand Down Expand Up @@ -122,7 +122,7 @@ void FileDescriptorWatcher::Controller::Watcher::StartWatching() {
}
}

void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
void FileDescriptorWatcher::Controller::Watcher::OnSocketReadyToRead(
int fd) {
DCHECK_EQ(fd_, fd);
DCHECK_EQ(MessagePumpForIO::WATCH_READ, mode_);
Expand All @@ -133,7 +133,7 @@ void FileDescriptorWatcher::Controller::Watcher::OnFileCanReadWithoutBlocking(
FROM_HERE, BindOnce(&Controller::RunCallback, controller_));
}

void FileDescriptorWatcher::Controller::Watcher::OnFileCanWriteWithoutBlocking(
void FileDescriptorWatcher::Controller::Watcher::OnSocketReadyToWrite(
int fd) {
DCHECK_EQ(fd_, fd);
DCHECK_EQ(MessagePumpForIO::WATCH_WRITE, mode_);
Expand Down
11 changes: 11 additions & 0 deletions base/files/file_util_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,17 @@ FilePath MakeAbsoluteFilePath(const FilePath& input) {
return input;
}

bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
if (flags == -1)
return false;
if (flags & O_NONBLOCK)
return true;
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
return false;
return true;
}

namespace internal {

bool MoveUnsafe(const FilePath& from_path, const FilePath& to_path) {
Expand Down
12 changes: 6 additions & 6 deletions base/message_loop/fd_watch_controller_posix_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ class FdWatchControllerPosixTest : public testing::Test {

class TestHandler : public MessagePumpForIO::FdWatcher {
public:
void OnFileCanReadWithoutBlocking(int fd) override {
void OnSocketReadyToRead(int fd) override {
watcher_to_delete_ = nullptr;
is_readable_ = true;
RunLoop::QuitCurrentWhenIdleDeprecated();
}
void OnFileCanWriteWithoutBlocking(int fd) override {
void OnSocketReadyToWrite(int fd) override {
watcher_to_delete_ = nullptr;
is_writable_ = true;
RunLoop::QuitCurrentWhenIdleDeprecated();
Expand Down Expand Up @@ -102,7 +102,7 @@ class CallClosureHandler : public MessagePumpForIO::FdWatcher {
}

// base::WatchableIOMessagePumpPosix::FdWatcher:
void OnFileCanReadWithoutBlocking(int fd) override {
void OnSocketReadyToRead(int fd) override {
// Empty the pipe buffer to reset the event. Otherwise libevent
// implementation of MessageLoop may call the event handler again even if
// |read_closure_| below quits the RunLoop.
Expand All @@ -118,7 +118,7 @@ class CallClosureHandler : public MessagePumpForIO::FdWatcher {
std::move(read_closure_).Run();
}

void OnFileCanWriteWithoutBlocking(int fd) override {
void OnSocketReadyToWrite(int fd) override {
ASSERT_FALSE(write_closure_.is_null());
std::move(write_closure_).Run();
}
Expand Down Expand Up @@ -209,7 +209,7 @@ class ReaderWriterHandler : public MessagePumpForIO::FdWatcher {
ReaderWriterHandler& operator=(const ReaderWriterHandler&) = delete;

// base::WatchableIOMessagePumpPosix::FdWatcher:
void OnFileCanReadWithoutBlocking(int fd) override {
void OnSocketReadyToRead(int fd) override {
if (when_ == kOnReadEvent) {
DoAction();
} else {
Expand All @@ -218,7 +218,7 @@ class ReaderWriterHandler : public MessagePumpForIO::FdWatcher {
}
}

void OnFileCanWriteWithoutBlocking(int fd) override {
void OnSocketReadyToWrite(int fd) override {
if (when_ == kOnWriteEvent) {
DoAction();
} else {
Expand Down
4 changes: 2 additions & 2 deletions base/message_loop/message_pump_fuchsia.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
// work that would touch |this|.
bool* was_stopped = was_stopped_;
if (filtered_events & FDIO_EVT_WRITABLE)
watcher_->OnFileCanWriteWithoutBlocking(fd_);
watcher_->OnSocketReadyToWrite(fd_);
if (!*was_stopped && (filtered_events & FDIO_EVT_READABLE))
watcher_->OnFileCanReadWithoutBlocking(fd_);
watcher_->OnSocketReadyToRead(fd_);

// Don't add additional work here without checking |*was_stopped_| again.
}
Expand Down
4 changes: 2 additions & 2 deletions base/message_loop/message_pump_glib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,14 @@ void MessagePumpGlib::FdWatchController::NotifyCanRead() {
if (!watcher_)
return;
DCHECK(poll_fd_);
watcher_->OnFileCanReadWithoutBlocking(poll_fd_->fd);
watcher_->OnSocketReadyToRead(poll_fd_->fd);
}

void MessagePumpGlib::FdWatchController::NotifyCanWrite() {
if (!watcher_)
return;
DCHECK(poll_fd_);
watcher_->OnFileCanWriteWithoutBlocking(poll_fd_->fd);
watcher_->OnSocketReadyToWrite(poll_fd_->fd);
}

bool MessagePumpGlib::WatchFileDescriptor(int fd,
Expand Down
22 changes: 11 additions & 11 deletions base/message_loop/message_pump_glib_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ class BaseWatcher : public MessagePumpGlib::FdWatcher {
~BaseWatcher() override = default;

// base:MessagePumpGlib::FdWatcher interface
void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnSocketReadyToRead(int /* fd */) override { NOTREACHED(); }
void OnSocketReadyToWrite(int /* fd */) override { NOTREACHED(); }

protected:
raw_ptr<MessagePumpGlib::FdWatchController> controller_;
Expand All @@ -676,7 +676,7 @@ class DeleteWatcher : public BaseWatcher {

bool HasController() const { return !!controller_; }

void OnFileCanWriteWithoutBlocking(int /* fd */) override {
void OnSocketReadyToWrite(int /* fd */) override {
ClearController();
}

Expand All @@ -698,7 +698,7 @@ class StopWatcher : public BaseWatcher {

~StopWatcher() override = default;

void OnFileCanWriteWithoutBlocking(int /* fd */) override {
void OnSocketReadyToWrite(int /* fd */) override {
controller_->StopWatchingFileDescriptor();
}
};
Expand All @@ -717,14 +717,14 @@ class NestedPumpWatcher : public MessagePumpGlib::FdWatcher {
NestedPumpWatcher() = default;
~NestedPumpWatcher() override = default;

void OnFileCanReadWithoutBlocking(int /* fd */) override {
void OnSocketReadyToRead(int /* fd */) override {
RunLoop runloop;
SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
runloop.Run();
}

void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
void OnSocketReadyToWrite(int /* fd */) override {}
};

class QuitWatcher : public DeleteWatcher {
Expand All @@ -734,7 +734,7 @@ class QuitWatcher : public DeleteWatcher {
: DeleteWatcher(std::move(controller)),
quit_closure_(std::move(quit_closure)) {}

void OnFileCanReadWithoutBlocking(int fd) override {
void OnSocketReadyToRead(int fd) override {
ClearController();
if (quit_closure_)
std::move(quit_closure_).Run();
Expand All @@ -753,9 +753,9 @@ void WriteFDWrapper(const int fd,

} // namespace

// Tests that MessagePumpGlib::FdWatcher::OnFileCanReadWithoutBlocking is not
// Tests that MessagePumpGlib::FdWatcher::OnSocketReadyToRead is not
// called for a READ_WRITE event, and that the controller is destroyed in
// OnFileCanWriteWithoutBlocking callback.
// OnSocketReadyToWrite callback.
TEST_F(MessagePumpGLibFdWatchTest, DeleteWatcher) {
auto pump = std::make_unique<MessagePumpGlib>();
auto controller_ptr =
Expand All @@ -771,9 +771,9 @@ TEST_F(MessagePumpGLibFdWatchTest, DeleteWatcher) {
EXPECT_FALSE(watcher.HasController());
}

// Tests that MessagePumpGlib::FdWatcher::OnFileCanReadWithoutBlocking is not
// Tests that MessagePumpGlib::FdWatcher::OnSocketReadyToRead is not
// called for a READ_WRITE event, when the watcher calls
// StopWatchingFileDescriptor in OnFileCanWriteWithoutBlocking callback.
// StopWatchingFileDescriptor in OnSocketReadyToWrite callback.
TEST_F(MessagePumpGLibFdWatchTest, StopWatcher) {
std::unique_ptr<MessagePumpGlib> pump(new MessagePumpGlib);
MessagePumpGlib::FdWatchController controller(FROM_HERE);
Expand Down
12 changes: 6 additions & 6 deletions base/message_loop/message_pump_io_ios.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ void MessagePumpIOSForIO::FdWatchController::Init(CFFileDescriptorRef fdref,
fd_source_.reset(fd_source);
}

void MessagePumpIOSForIO::FdWatchController::OnFileCanReadWithoutBlocking(
void MessagePumpIOSForIO::FdWatchController::OnSocketReadyToRead(
int fd,
MessagePumpIOSForIO* pump) {
DCHECK(callback_types_ & kCFFileDescriptorReadCallBack);
watcher_->OnFileCanReadWithoutBlocking(fd);
watcher_->OnSocketReadyToRead(fd);
}

void MessagePumpIOSForIO::FdWatchController::OnFileCanWriteWithoutBlocking(
void MessagePumpIOSForIO::FdWatchController::OnSocketReadyToWrite(
int fd,
MessagePumpIOSForIO* pump) {
DCHECK(callback_types_ & kCFFileDescriptorWriteCallBack);
watcher_->OnFileCanWriteWithoutBlocking(fd);
watcher_->OnSocketReadyToWrite(fd);
}

MessagePumpIOSForIO::MessagePumpIOSForIO() : weak_factory_(this) {
Expand Down Expand Up @@ -161,7 +161,7 @@ void MessagePumpIOSForIO::HandleFdIOEvent(CFFileDescriptorRef fdref,
MessagePumpIOSForIO* pump = controller->pump().get();
DCHECK(pump);
if (callback_types & kCFFileDescriptorWriteCallBack)
controller->OnFileCanWriteWithoutBlocking(fd, pump);
controller->OnSocketReadyToWrite(fd, pump);

// Perform the read callback only if the file descriptor has not been
// invalidated in the write callback. As |FdWatchController| invalidates
Expand All @@ -170,7 +170,7 @@ void MessagePumpIOSForIO::HandleFdIOEvent(CFFileDescriptorRef fdref,
if (callback_types & kCFFileDescriptorReadCallBack &&
CFFileDescriptorIsValid(fdref)) {
DCHECK_EQ(fdref, controller->fdref_.get());
controller->OnFileCanReadWithoutBlocking(fd, pump);
controller->OnSocketReadyToRead(fd, pump);
}

// Re-enable callbacks after the read/write if the file descriptor is still
Expand Down
4 changes: 2 additions & 2 deletions base/message_loop/message_pump_io_ios.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class BASE_EXPORT MessagePumpIOSForIO : public MessagePumpNSRunLoop,

void set_watcher(FdWatcher* watcher) { watcher_ = watcher; }

void OnFileCanReadWithoutBlocking(int fd, MessagePumpIOSForIO* pump);
void OnFileCanWriteWithoutBlocking(int fd, MessagePumpIOSForIO* pump);
void OnSocketReadyToRead(int fd, MessagePumpIOSForIO* pump);
void OnSocketReadyToWrite(int fd, MessagePumpIOSForIO* pump);

bool is_persistent_ = false; // false if this event is one-shot.
base::mac::ScopedCFFileDescriptorRef fdref_;
Expand Down
12 changes: 6 additions & 6 deletions base/message_loop/message_pump_io_ios_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class StupidWatcher : public MessagePumpIOSForIO::FdWatcher {
~StupidWatcher() override {}

// base:MessagePumpIOSForIO::FdWatcher interface
void OnFileCanReadWithoutBlocking(int fd) override {}
void OnFileCanWriteWithoutBlocking(int fd) override {}
void OnSocketReadyToRead(int fd) override {}
void OnSocketReadyToWrite(int fd) override {}
};

class BaseWatcher : public MessagePumpIOSForIO::FdWatcher {
Expand All @@ -70,9 +70,9 @@ class BaseWatcher : public MessagePumpIOSForIO::FdWatcher {
~BaseWatcher() override {}

// MessagePumpIOSForIO::FdWatcher interface
void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnSocketReadyToRead(int /* fd */) override { NOTREACHED(); }

void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
void OnSocketReadyToWrite(int /* fd */) override { NOTREACHED(); }

protected:
MessagePumpIOSForIO::FdWatchController* controller_;
Expand All @@ -85,7 +85,7 @@ class DeleteWatcher : public BaseWatcher {

~DeleteWatcher() override { DCHECK(!controller_); }

void OnFileCanWriteWithoutBlocking(int /* fd */) override {
void OnSocketReadyToWrite(int /* fd */) override {
DCHECK(controller_);
delete controller_;
controller_ = NULL;
Expand Down Expand Up @@ -115,7 +115,7 @@ class StopWatcher : public BaseWatcher {

~StopWatcher() override {}

void OnFileCanWriteWithoutBlocking(int /* fd */) override {
void OnSocketReadyToWrite(int /* fd */) override {
controller_->StopWatchingFileDescriptor();
if (fd_to_start_watching_ >= 0) {
pump_->WatchFileDescriptor(fd_to_start_watching_,
Expand Down
Loading

0 comments on commit aaf0f38

Please sign in to comment.