Skip to content

Commit

Permalink
Using socket_posix to replace socket_starboard
Browse files Browse the repository at this point in the history
Using base/starboard implementation to work with net/tcp&udp.

Tested cobalt on Linux-x64 and evergreen-x64.

The waiter double add check should be removed later.

Reverted unnecessary function name changes regarding pump.
Removed musl socket changes.

Todo:
  Cobalt on Win32
  on linux and evergreen, net_unittests test cases fail
     TransportClientSocketTest.FullDuplex_WriteFirst
     TransportClientSocketTest.FullDuplex_ReadFirst
  on Windows
     net_unittests test case fails
       CertVerifyProcBuiltinTest.CRLNotCheckedForKnownRoots
     Cobalt hangs

b/302741384
  • Loading branch information
maxz-lab committed Aug 16, 2024
1 parent e25e899 commit 9ffed3b
Show file tree
Hide file tree
Showing 34 changed files with 578 additions and 60 deletions.
141 changes: 139 additions & 2 deletions base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

#include "base/message_loop/message_pump_io_starboard.h"

#if SB_API_VERSION >= 16
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#endif

#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
Expand All @@ -28,17 +34,46 @@ namespace base {
MessagePumpIOStarboard::SocketWatcher::SocketWatcher(const Location& from_here)
: created_from_location_(from_here),
interests_(kSbSocketWaiterInterestNone),
#if SB_API_VERSION >= 16
socket_(-1),
#else
socket_(kSbSocketInvalid),
#endif
pump_(nullptr),
watcher_(nullptr),
weak_factory_(this) {}

MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
#if SB_API_VERSION >= 16
if (socket_ >= 0) {
StopWatchingFileDescriptor();
}
#else
if (SbSocketIsValid(socket_)) {
StopWatchingSocket();
}
#endif
}

#if SB_API_VERSION >= 16
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingFileDescriptor() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
if (socket_ < 0) {
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}
int socket = Release();
bool result = true;
if (socket >= 0) {
DCHECK(pump_);
result = pump_->StopWatchingFileDescriptor(socket);
}
pump_ = nullptr;
return result;
}
#else
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
Expand All @@ -57,21 +92,59 @@ bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
pump_ = nullptr;
return result;
}
#endif

#if SB_API_VERSION >= 16
void MessagePumpIOStarboard::SocketWatcher::Init(int socket,
bool persistent) {
DCHECK(socket >= 0);
DCHECK(socket_ < 0);
#else
void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket,
bool persistent) {
DCHECK(socket);
DCHECK(!socket_);
#endif
socket_ = socket;
persistent_ = persistent;
}

#if SB_API_VERSION >= 16
int MessagePumpIOStarboard::SocketWatcher::Release() {
int socket = socket_;
socket_ = -1;
return socket;
}
#else
SbSocket MessagePumpIOStarboard::SocketWatcher::Release() {
SbSocket socket = socket_;
socket_ = kSbSocketInvalid;
return socket;
}
#endif

#if SB_API_VERSION >= 16
void MessagePumpIOStarboard::SocketWatcher::OnFileCanReadWithoutBlocking(
int socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanReadWithoutBlocking(socket);
pump->DidProcessIOEvent();
}

void MessagePumpIOStarboard::SocketWatcher::OnFileCanWriteWithoutBlocking(
int socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanWriteWithoutBlocking(socket);
pump->DidProcessIOEvent();
}

#else
void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead(
SbSocket socket,
MessagePumpIOStarboard* pump) {
Expand All @@ -91,6 +164,7 @@ void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite(
watcher_->OnSocketReadyToWrite(socket);
pump->DidProcessIOEvent();
}
#endif

MessagePumpIOStarboard::MessagePumpIOStarboard()
: keep_running_(true),
Expand All @@ -102,12 +176,21 @@ MessagePumpIOStarboard::~MessagePumpIOStarboard() {
SbSocketWaiterDestroy(waiter_);
}

#if SB_API_VERSION >= 16
bool MessagePumpIOStarboard::WatchFileDescriptor(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(socket >= 0);
#else
bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
#endif
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
Expand All @@ -123,8 +206,13 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= kSbSocketWaiterInterestWrite;
}

#if SB_API_VERSION >= 16
int old_socket = controller->Release();
if (old_socket >= 0) {
#else
SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
#endif
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
Expand All @@ -141,12 +229,24 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= old_interest_mask;

// Must disarm the event before we can reuse it.
#if SB_API_VERSION >= 16
SbPosixSocketWaiterRemove(waiter_, old_socket);
#else
SbSocketWaiterRemove(waiter_, old_socket);
#endif // SB_API_VERSION >= 16
}

// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent)) {
bool result = false;
#if SB_API_VERSION >= 16
result = SbPosixSocketWaiterAdd(waiter_, socket, controller,
OnPosixSocketWaiterNotification, interests, persistent);

#else
result = SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent);
#endif // SB_API_VERSION >= 16
if (result == false) {
return false;
}

Expand All @@ -157,9 +257,15 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
return true;
}

#if SB_API_VERSION >= 16
bool MessagePumpIOStarboard::StopWatchingFileDescriptor(int socket) {
return SbPosixSocketWaiterRemove(waiter_, socket);
}
#else
bool MessagePumpIOStarboard::StopWatching(SbSocket socket) {
return SbSocketWaiterRemove(waiter_, socket);
}
#endif // SB_API_VERSION >= 16 || SB_IS(MODULAR)

void MessagePumpIOStarboard::AddIOObserver(IOObserver* obs) {
io_observers_.AddObserver(obs);
Expand Down Expand Up @@ -252,6 +358,36 @@ void MessagePumpIOStarboard::DidProcessIOEvent() {
}
}

#if SB_API_VERSION >= 16

// static
void MessagePumpIOStarboard::OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests) {
base::WeakPtr<SocketWatcher> controller =
static_cast<SocketWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());

MessagePumpIOStarboard* pump = controller->pump();
pump->processed_io_events_ = true;

// If not persistent, the watch has been released at this point.
if (!controller->persistent()) {
controller->Release();
}

if (ready_interests & kSbSocketWaiterInterestWrite) {
controller->OnFileCanWriteWithoutBlocking(socket, pump);
}

// Check |controller| in case it's been deleted previously.
if (controller.get() && ready_interests & kSbSocketWaiterInterestRead) {
controller->OnFileCanReadWithoutBlocking(socket, pump);
}
}

#else
// static
void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
Expand Down Expand Up @@ -279,4 +415,5 @@ void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
}
}

#endif // SB_API_VERSION >= 16
} // namespace base
43 changes: 43 additions & 0 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef BASE_MESSAGE_PUMP_IO_STARBOARD_H_
#define BASE_MESSAGE_PUMP_IO_STARBOARD_H_

#include "starboard/configuration.h"

#include "base/compiler_specific.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump.h"
Expand Down Expand Up @@ -49,8 +51,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
public:
// These methods are called from MessageLoop::Run when a socket can be
// interacted with without blocking.
#if SB_API_VERSION >= 16
virtual void OnFileCanReadWithoutBlocking(int socket) {}
virtual void OnFileCanWriteWithoutBlocking(int socket) {}
#else
virtual void OnSocketReadyToRead(SbSocket socket) {}
virtual void OnSocketReadyToWrite(SbSocket socket) {}
#endif

protected:
virtual ~Watcher() {}
Expand All @@ -70,7 +77,11 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Stops watching the socket, always safe to call. No-op if there's nothing
// to do.
#if SB_API_VERSION >= 16
bool StopWatchingFileDescriptor();
#else
bool StopWatchingSocket();
#endif

bool persistent() const { return persistent_; }

Expand All @@ -79,8 +90,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
friend class MessagePumpIOStarboardTest;

// Called by MessagePumpIOStarboard.
#if SB_API_VERSION >= 16
void Init(int socket, bool persistent);
int Release();
#else
void Init(SbSocket socket, bool persistent);
SbSocket Release();
#endif

int interests() const { return interests_; }
void set_interests(int interests) { interests_ = interests; }
Expand All @@ -90,12 +106,21 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

void set_watcher(Watcher* watcher) { watcher_ = watcher; }

#if SB_API_VERSION >= 16
void OnFileCanReadWithoutBlocking(int socket, MessagePumpIOStarboard* pump);
void OnFileCanWriteWithoutBlocking(int socket, MessagePumpIOStarboard* pump);
#else
void OnSocketReadyToRead(SbSocket socket, MessagePumpIOStarboard* pump);
void OnSocketReadyToWrite(SbSocket socket, MessagePumpIOStarboard* pump);
#endif

const Location created_from_location_;
int interests_;
#if SB_API_VERSION >= 16
int socket_;
#else
SbSocket socket_;
#endif
bool persistent_;
MessagePumpIOStarboard* pump_;
Watcher* watcher_;
Expand Down Expand Up @@ -123,6 +148,16 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// If an error occurs while calling this method in a cumulative fashion, the
// event previously attached to |controller| is aborted. Returns true on
// success. Must be called on the same thread the message_pump is running on.
#if SB_API_VERSION >= 16
bool WatchFileDescriptor(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate);

// Stops watching the socket.
bool StopWatchingFileDescriptor(int socket);
#else
bool Watch(SbSocket socket,
bool persistent,
int mode,
Expand All @@ -131,6 +166,7 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Stops watching the socket.
bool StopWatching(SbSocket socket);
#endif

void AddIOObserver(IOObserver* obs);
void RemoveIOObserver(IOObserver* obs);
Expand All @@ -149,10 +185,17 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Called by SbSocketWaiter to tell us a registered socket can be read and/or
// written to.
#if SB_API_VERSION >= 16
static void OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests);
#else
static void OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests);
#endif

bool should_quit() const { return !keep_running_; }

Expand Down
Loading

0 comments on commit 9ffed3b

Please sign in to comment.