From ad5e6e276c7ed39764bd546e77f8576ead81d23b Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Thu, 12 Sep 2024 12:49:00 -0700 Subject: [PATCH] Handle pipe closing --- CMakeLists.txt | 1 + include/aws/io/io.h | 4 + source/posix/pipe.c | 13 ++ source/qnx/ionotify_event_loop.c | 375 +++++++++++++++++-------------- tests/pipe_test.c | 5 + 5 files changed, 233 insertions(+), 165 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c7c501c9..ee8c1f6e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -153,6 +153,7 @@ elseif(CMAKE_SYSTEM_NAME STREQUAL "QNX") ) set(EVENT_LOOP_DEFINE "ON_EVENT_WITH_RESULT") set(USE_S2N ON) + list(APPEND PLATFORM_LIBS "socket") endif() if (BYO_CRYPTO) diff --git a/include/aws/io/io.h b/include/aws/io/io.h index 2baf7db1a..885afebb9 100644 --- a/include/aws/io/io.h +++ b/include/aws/io/io.h @@ -25,7 +25,11 @@ struct aws_event_loop; struct aws_io_handle_io_op_result { size_t read_bytes; size_t written_bytes; + /** Error codes representing generic errors happening on I/O handles. */ + int error_code; + /** Error codes specific to reading operations. */ int read_error_code; + /** Error codes specific to writing operations. */ int write_error_code; }; diff --git a/source/posix/pipe.c b/source/posix/pipe.c index 319e6621b..bb4bfeacb 100644 --- a/source/posix/pipe.c +++ b/source/posix/pipe.c @@ -6,6 +6,7 @@ #include #include +#include #ifdef __GLIBC__ # define __USE_GNU @@ -290,6 +291,16 @@ int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_b } return s_raise_posix_error(errno_value); } +#if AWS_USE_ON_EVENT_WITH_RESULT + else if (read_val == 0) { + if (read_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + memset(&io_op_result, 0, sizeof(struct aws_io_handle_io_op_result)); + io_op_result.error_code = AWS_IO_SOCKET_CLOSED; + read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); + } + } +#endif /* Success */ dst_buffer->len += read_val; @@ -310,6 +321,8 @@ static void s_read_end_on_event( (void)event_loop; (void)handle; + AWS_LOGF_TRACE(12, "=== s_read_end_on_event is called"); + /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */ struct aws_pipe_read_end *read_end = user_data; struct read_end_impl *read_impl = read_end->impl_data; diff --git a/source/qnx/ionotify_event_loop.c b/source/qnx/ionotify_event_loop.c index c449d583f..665f07467 100644 --- a/source/qnx/ionotify_event_loop.c +++ b/source/qnx/ionotify_event_loop.c @@ -79,34 +79,39 @@ struct ionotify_loop { uint32_t last_handle_id; }; +/* Data associated with a subscribed I/O handle. */ struct ionotify_event_data { struct aws_allocator *alloc; struct aws_io_handle *handle; struct aws_event_loop *event_loop; aws_event_loop_on_event_fn *on_event; int events_subscribed; + int latest_io_operation_error_code; /* Connection opened on the pulse channel. Used to send pulses to the main event loop. */ int pulse_connection_id; struct sigevent event; void *user_data; struct aws_task subscribe_task; struct aws_task cleanup_task; + /* ID that can fit into pulse user data (only _NOTIFY_COND_MASK bits in this field can be used). */ uint32_t handle_id; - /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */ + /* False when handle is unsubscribed, but this struct hasn't been cleaned up yet. */ bool is_subscribed; - bool is_cross_thread; }; /* Default timeout is 100 seconds. */ static uint64_t DEFAULT_TIMEOUT = 100ULL * 1000000000; /* Special constant, _NOTIFY_COND_MASK, limits the maximum value that can be used as user data in I/O events. * For example, it's 28-bit wide on QNX 8.0. */ + // TODO Use unsugned int? static uint32_t MAX_HANDLE_ID = _NOTIFY_COND_MASK; /* SI_NOTIFY is a QNX special sigev code requesting resource managers to return active event type along with the event * itself. */ static short IO_EVENT_PULSE_SIGEV_CODE = SI_NOTIFY; static short CROSS_THREAD_PULSE_SIGEV_CODE = _PULSE_CODE_MINAVAIL; +static short IO_EVENT_KICKSTART_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 1; +static short IO_EVENT_UPDATE_ERROR_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 2; /* Setup edge triggered ionotify with a scheduler. */ struct aws_event_loop *aws_event_loop_new_default_with_options( @@ -345,56 +350,6 @@ static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *ta aws_task_scheduler_cancel_task(&ionotify_loop->scheduler, task); } -static void s_update_io_result( - struct aws_event_loop *event_loop, - struct aws_io_handle *handle, - const struct aws_io_handle_io_op_result *io_op_result) { - - AWS_ASSERT(handle->additional_data); - struct ionotify_event_data *ionotify_event_data = handle->additional_data; - AWS_ASSERT(event_loop == ionotify_event_data->event_loop); - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, - "id=%p: Got result for I/O operation for fd %d: read status %d (%s); write status %d (%s)", - (void *)event_loop, - handle->data.fd, - io_op_result->read_error_code, - aws_error_str(io_op_result->read_error_code), - io_op_result->write_error_code, - aws_error_str(io_op_result->write_error_code)); - uint32_t event_mask = _NOTIFY_COND_OBAND; - if (io_op_result->read_error_code == AWS_IO_READ_WOULD_BLOCK) { - event_mask |= _NOTIFY_COND_INPUT; - } - if (io_op_result->write_error_code == AWS_IO_READ_WOULD_BLOCK) { - event_mask |= _NOTIFY_COND_OUTPUT; - } - - if (event_mask != 0) { - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, "id=%p: Got EWOULDBLOCK for fd %d, rearming it", (void *)event_loop, handle->data.fd); - int rc = ionotify( - ionotify_event_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, event_mask, &ionotify_event_data->event); - int errno_value = errno; - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, - "id=%p: Rearming ionotify returned %d (input %d; output %d)", - (void *)event_loop, - rc, - rc & _NOTIFY_COND_INPUT, - rc & _NOTIFY_COND_OUTPUT); - if (rc == -1) { - AWS_LOGF_ERROR( - AWS_LS_IO_EVENT_LOOP, - "id=%p: Failed to subscribe to events on fd %d: error %d (%s)", - (void *)event_loop, - ionotify_event_data->handle->data.fd, - errno_value, - strerror(errno_value)); - } - } -} - /* Map ionotify_event_data to internal ID limited by MAX_HANDLE_ID value. */ static int s_add_handle(struct ionotify_loop *ionotify_loop, struct ionotify_event_data *ionotify_event_data) { AWS_ASSERT(s_is_on_callers_thread(ionotify_event_data->event_loop)); @@ -451,11 +406,11 @@ static void s_remove_handle( aws_hash_table_remove(&ionotify_loop->handles, (void *)handle_id, NULL, NULL); } -/* Scheduled task that connects aws_io_handle with the kqueue */ +/* Scheduled task that performs the actual subscription using ionotify. */ static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) { (void)task; - /* if task was cancelled, nothing to do */ + /* If task was cancelled, nothing to do. */ if (status == AWS_TASK_STATUS_CANCELED) { return; } @@ -466,51 +421,65 @@ static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_ta AWS_LOGF_TRACE( AWS_LS_IO_EVENT_LOOP, - "id=%p: Subscribing to events on fd %d", + "id=%p: Subscribing to events on fd %d for events %d", (void *)event_loop, - ionotify_event_data->handle->data.fd); + ionotify_event_data->handle->data.fd, + ionotify_event_data->events_subscribed); + + /* Map ionotify_event_data to ID. This ID will be returned with the I/O events from ionotify. */ + if (ionotify_event_data->handle_id == 0) { + s_add_handle(ionotify_loop, ionotify_event_data); + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Mapped fd %d to handle ID %u", + (void *)event_loop, + ionotify_event_data->handle->data.fd, + ionotify_event_data->handle_id); + /* I/O events from ionotify will be delivered as pulses with a user-defined 28-bit ID. + * SIGEV_PULSE_PRIO_INHERIT means the thread that receives the pulse will run at the initial priority of the + * process. */ + short pulse_priority = SIGEV_PULSE_PRIO_INHERIT; + short pulse_sigev_code = IO_EVENT_PULSE_SIGEV_CODE; + SIGEV_PULSE_INT_INIT( + &ionotify_event_data->event, + ionotify_event_data->pulse_connection_id, + pulse_priority, + pulse_sigev_code, + ionotify_event_data->handle_id); + /* Set special bits in the event structure to allow resource managers set I/O event types in the sigev_code field */ + ionotify_event_data->event.sigev_notify |= SIGEV_FLAG_CODE_UPDATEABLE; + SIGEV_MAKE_UPDATEABLE(&ionotify_event_data->event); + + /* The application must register the event by calling MsgRegisterEvent() with the fd processed in ionotify(). + * See: + * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/i/ionotify.html + * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/m/msgregisterevent.html + * + * It's enough to register an event only once and then reuse it on followup ionotify rearming calls. + * NOTE: If you create a new sigevent for the same file descriptor, with the same flags, you HAVE to register it. */ + MsgRegisterEvent(&ionotify_event_data->event, ionotify_event_data->handle->data.fd); + } + + ionotify_event_data->is_subscribed = true; - /* Everyone is always registered for out-of-band data and errors. */ - uint32_t event_mask = _NOTIFY_COND_OBAND | _NOTIFY_COND_EXTEN; + /* Everyone is always registered for errors. */ + uint32_t event_mask = _NOTIFY_COND_EXTEN | _NOTIFY_CONDE_ERR | _NOTIFY_CONDE_HUP | _NOTIFY_CONDE_NVAL; if (ionotify_event_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { event_mask |= _NOTIFY_COND_INPUT; + event_mask |= _NOTIFY_CONDE_RDNORM; + event_mask |= _NOTIFY_COND_OBAND; } if (ionotify_event_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { event_mask |= _NOTIFY_COND_OUTPUT; + event_mask |= _NOTIFY_CONDE_WRNORM; } - /* Map ionotify_event_data to ID. This ID will be returned with the I/O events from ionotify. */ - s_add_handle(ionotify_loop, ionotify_event_data); - - /* I/O events from ionotify will be delivered as pulses with a user-defined 28-bit ID. - * SIGEV_PULSE_PRIO_INHERIT means the thread that receives the pulse will run at the initial priority of the - * process. */ - short pulse_priority = SIGEV_PULSE_PRIO_INHERIT; - short pulse_sigev_code = IO_EVENT_PULSE_SIGEV_CODE; - SIGEV_PULSE_INT_INIT( - &ionotify_event_data->event, - ionotify_event_data->pulse_connection_id, - pulse_priority, - pulse_sigev_code, - ionotify_event_data->handle_id); - /* Set special bits in the event structure to allow resource managers set I/O event types in the sigev_code field */ - SIGEV_MAKE_UPDATEABLE(&ionotify_event_data->event); - - /* The application must register the event by calling MsgRegisterEvent() with the fd processed in ionotify(). - * See: - * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/i/ionotify.html - * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/m/msgregisterevent.html - * - * It's enough to register an event only once and then reuse it on followup ionotify rearming calls. - * NOTE: If you create a new sigevent for the same file descriptor, with the same flags, you HAVE to register it. */ - MsgRegisterEvent(&ionotify_event_data->event, ionotify_event_data->handle->data.fd); - - ionotify_event_data->is_subscribed = true; - - /* Arm resource manager associated with a given file descriptor in edge-triggered mode. */ + /* Arm resource manager associated with a given file descriptor in edge-triggered mode. + * After this call, a corresponding resource manager starts sending events. */ int rc = ionotify(ionotify_event_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, event_mask, &ionotify_event_data->event); int errno_value = errno; + if (rc == -1) { AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, @@ -523,21 +492,19 @@ static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_ta event_loop, ionotify_event_data->handle, AWS_IO_EVENT_TYPE_ERROR, ionotify_event_data->user_data); return; } - /* Set callback for I/O operation results. */ - ionotify_event_data->handle->update_io_result = s_update_io_result; - /* Send notification to kick-start processing fd if it has desired conditions already. */ + /* ionotify returns active conditions if there are any. Send notification to kick-start processing fd if it has desired conditions. */ int kick_start_event_mask = rc & _NOTIFY_COND_MASK; if (kick_start_event_mask != 0) { + // TODO Handle HUP and its friends. AWS_LOGF_TRACE( AWS_LS_IO_EVENT_LOOP, - "id=%p: Sending pulse for fd %d because it has met I/O conditions", - (void *)event_loop, - ionotify_event_data->handle->data.fd); + "id=%p: Sending pulse for fd %d because it has desired I/O conditions (rc is %d)", + (void *)event_loop, ionotify_event_data->handle->data.fd, rc); /* Set _NOTIFY_COND_MASK low bits to ID that will be */ kick_start_event_mask |= ionotify_event_data->handle_id; int send_rc = - MsgSendPulse(ionotify_loop->pulse_connection_id, -1, IO_EVENT_PULSE_SIGEV_CODE, kick_start_event_mask); + MsgSendPulse(ionotify_loop->pulse_connection_id, -1, IO_EVENT_KICKSTART_SIGEV_CODE, kick_start_event_mask); if (send_rc == -1) { AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, @@ -548,6 +515,75 @@ static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_ta } } +/* This callback is called by I/O operations to notify about their results. */ +static void s_update_io_result( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + const struct aws_io_handle_io_op_result *io_op_result) { + + AWS_ASSERT(handle->additional_data); + struct ionotify_event_data *ionotify_event_data = handle->additional_data; + + if (event_loop != ionotify_event_data->event_loop) { + /* TODO */ + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Got I/O operation result from another thread", (void *)event_loop); + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Got result for I/O operation for fd %d: status %d (%s); read status %d (%s); write status %d (%s)", + (void *)event_loop, + handle->data.fd, + io_op_result->error_code, + aws_error_str(io_op_result->error_code), + io_op_result->read_error_code, + aws_error_str(io_op_result->read_error_code), + io_op_result->write_error_code, + aws_error_str(io_op_result->write_error_code)); + + uint32_t event_mask = 0; + if (io_op_result->error_code == AWS_IO_SOCKET_CLOSED) { + ionotify_event_data->latest_io_operation_error_code = AWS_IO_EVENT_TYPE_CLOSED; + } + if (io_op_result->read_error_code == AWS_IO_READ_WOULD_BLOCK) { + event_mask |= AWS_IO_EVENT_TYPE_READABLE; + } + if (io_op_result->write_error_code == AWS_IO_READ_WOULD_BLOCK) { + event_mask |= AWS_IO_EVENT_TYPE_WRITABLE; + } + + /* Rearm resource manager. */ + if (event_mask != 0) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: Got EWOULDBLOCK for fd %d, rearming it", (void *)event_loop, handle->data.fd); + /* We're on the event loop thread, just run the subscribing task. */ + ionotify_event_data->events_subscribed = event_mask; + s_subscribe_task(NULL, ionotify_event_data, AWS_TASK_STATUS_CANCELED); + } + + /* Notify event loop of error condition. */ + if (ionotify_event_data->latest_io_operation_error_code != 0) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: fd errored, sending pulse for fd %d", + (void *)event_loop, ionotify_event_data->handle->data.fd); + struct ionotify_loop *ionotify_loop = event_loop->impl_data; + int send_rc = + MsgSendPulse(ionotify_loop->pulse_connection_id, -1, IO_EVENT_UPDATE_ERROR_SIGEV_CODE, ionotify_event_data->handle_id); + int errno_value = errno; + if (send_rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Failed to send UPDATE_ERROR pulse for fd %d: error %d (%s)", + (void *)event_loop, + ionotify_event_data->handle->data.fd, + errno_value, + strerror(errno_value)); + } + } +} + static int s_subscribe_to_io_events( struct aws_event_loop *event_loop, struct aws_io_handle *handle, @@ -569,6 +605,7 @@ static int s_subscribe_to_io_events( ionotify_event_data->events_subscribed = events; ionotify_event_data->pulse_connection_id = ionotify_loop->pulse_connection_id; ionotify_event_data->user_data = user_data; + ionotify_event_data->handle->update_io_result = s_update_io_result; aws_task_init( &ionotify_event_data->subscribe_task, s_subscribe_task, ionotify_event_data, "ionotify_event_loop_subscribe"); @@ -600,16 +637,12 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc struct ionotify_event_data *ionotify_event_data = handle->additional_data; /* Disarm resource manager for a given fd. */ - uint32_t event_mask = _NOTIFY_COND_OBAND; - if (ionotify_event_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { - event_mask |= _NOTIFY_COND_INPUT; - } - if (ionotify_event_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { - event_mask |= _NOTIFY_COND_OUTPUT; - } + uint32_t event_mask = _NOTIFY_COND_EXTEN | _NOTIFY_CONDE_ERR | _NOTIFY_CONDE_HUP | _NOTIFY_CONDE_NVAL; + event_mask |= _NOTIFY_COND_INPUT | _NOTIFY_CONDE_RDNORM | _NOTIFY_COND_OBAND; + event_mask |= _NOTIFY_COND_OUTPUT | _NOTIFY_CONDE_WRNORM; int rc = ionotify(ionotify_event_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, event_mask, NULL); int errno_value = errno; - if (rc < 0) { + if (rc == -1) { AWS_LOGF_ERROR( AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to unsubscribe from events on fd %d: error %d (%s)", @@ -713,6 +746,80 @@ static void s_aws_ionotify_cleanup_aws_lc_thread_local_state(void *user_data) { aws_cal_thread_clean_up(); } +static void s_process_pulse(struct aws_event_loop *event_loop, const struct _pulse *pulse, bool *should_process_cross_thread_tasks ) { + if (pulse->code == CROSS_THREAD_PULSE_SIGEV_CODE) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: MsgReceived got cross-thread pulse", (void *)event_loop); + *should_process_cross_thread_tasks = true; + return; + } + + int user_data = pulse->value.sival_int; + + uint32_t handle_id = user_data & _NOTIFY_DATA_MASK; + if (handle_id == 0) { + AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Got pulse with empty handle ID, ignoring it", (void *)event_loop); + return; + } + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Got pulse for handle ID %u", (void *)event_loop, handle_id); + + struct ionotify_loop *ionotify_loop = event_loop->impl_data; + struct ionotify_event_data *ionotify_event_data = s_find_handle(event_loop, ionotify_loop, handle_id); + if (ionotify_event_data == NULL) { + /* This situation is totally OK when the corresponding fd is already unsubscribed. */ + AWS_LOGF_DEBUG( + AWS_LS_IO_EVENT_LOOP, + "id=%p: No mapped data found for handle ID %d, fd must be already unsubscribed", + (void *)event_loop, + handle_id); + return; + } + + if (!ionotify_event_data->is_subscribed) { + return; + } + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Processing fd %d: pulse code %d", (void *)event_loop, ionotify_event_data->handle->data.fd, pulse->code); + int event_mask = 0; + if (pulse->value.sival_int & _NOTIFY_COND_OBAND) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: fd got out-of-band data", (void *)event_loop); + event_mask |= AWS_IO_EVENT_TYPE_READABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_INPUT) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: fd is readable", (void *)event_loop); + event_mask |= AWS_IO_EVENT_TYPE_READABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_OUTPUT) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: fd is writable", (void *)event_loop); + event_mask |= AWS_IO_EVENT_TYPE_WRITABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_EXTEN) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: fd has extended condition: %d %d", (void *)event_loop, pulse->code, ionotify_event_data->event.sigev_code); + // TODO + if (pulse->code != IO_EVENT_PULSE_SIGEV_CODE) { + //event_mask |= AWS_IO_EVENT_TYPE_CLOSED; + } + } + + if (ionotify_event_data->latest_io_operation_error_code == AWS_IO_EVENT_TYPE_CLOSED) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: latest_io_operation_error_code is AWS_IO_EVENT_TYPE_CLOSED", (void *)event_loop); + event_mask |= AWS_IO_EVENT_TYPE_CLOSED; + } + + /* Reset the I/O operation code to not process it twice. */ + ionotify_event_data->latest_io_operation_error_code = 0; + + /* QNX docs says the event types bits should be cleared after receiving a new I/O event. Even + * though on QNX 8.0 only pulse struct sets these bits, clear the bits on sigevent struct + * anyway, in case it's different on other QNX versions. */ + ionotify_event_data->event.sigev_value.sival_int &= _NOTIFY_DATA_MASK; + + // TODO remove + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Calling on_event with event mask %d", (void *)event_loop, event_mask); + + ionotify_event_data->on_event(event_loop, ionotify_event_data->handle, event_mask, ionotify_event_data->user_data); +} + static void aws_event_loop_thread(void *args) { struct aws_event_loop *event_loop = args; AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop); @@ -746,69 +853,7 @@ static void aws_event_loop_thread(void *args) { AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Wake up with rcvid %ld\n", (void *)event_loop, rcvid); if (rcvid == 0) { - if (pulse.code == CROSS_THREAD_PULSE_SIGEV_CODE) { - AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: MsgReceived got cross-thread pulse", (void *)event_loop); - should_process_cross_thread_tasks = true; - } else if (pulse.code == IO_EVENT_PULSE_SIGEV_CODE) { - int handle_id = pulse.value.sival_int & _NOTIFY_DATA_MASK; - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, "id=%p: Got pulse for handle ID %u\n", (void *)event_loop, handle_id); - if (handle_id != 0) { - struct ionotify_event_data *ionotify_event_data = - s_find_handle(event_loop, ionotify_loop, handle_id); - if (ionotify_event_data == NULL) { - /* This situation is totally OK when the corresponding fd is already unsubscribed. */ - AWS_LOGF_DEBUG( - AWS_LS_IO_EVENT_LOOP, - "id=%p: No mapped data found for handle ID %d, fd must be already unsubscribed", - (void *)event_loop, - handle_id); - } else if (ionotify_event_data->is_subscribed) { - int event_mask = 0; - if (pulse.value.sival_int & _NOTIFY_COND_OBAND) { - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, - "id=%p: File descriptor got out-of-band data", - (void *)event_loop); - event_mask |= AWS_IO_EVENT_TYPE_READABLE; - } - if (pulse.value.sival_int & _NOTIFY_COND_INPUT) { - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, "id=%p: File descriptor is readable", (void *)event_loop); - event_mask |= AWS_IO_EVENT_TYPE_READABLE; - } - if (pulse.value.sival_int & _NOTIFY_COND_OUTPUT) { - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, "id=%p: File descriptor is writable", (void *)event_loop); - event_mask |= AWS_IO_EVENT_TYPE_WRITABLE; - } - if (pulse.value.sival_int & _NOTIFY_COND_EXTEN) { - AWS_LOGF_TRACE( - AWS_LS_IO_EVENT_LOOP, "id=%p: File descriptor is errored", (void *)event_loop); - event_mask |= AWS_IO_EVENT_TYPE_CLOSED; - } - - /* QNX docs says the event types bits should be cleared after receiving a new I/O event. Even - * though on QNX 8.0 only pulse struct sets these bits, clear the bits on sigevent struct - * anyway, in case it'll change in the future. */ - ionotify_event_data->event.sigev_value.sival_int &= _NOTIFY_DATA_MASK; - - ionotify_event_data->on_event( - event_loop, ionotify_event_data->handle, event_mask, ionotify_event_data->user_data); - } - } else { - AWS_LOGF_ERROR( - AWS_LS_IO_EVENT_LOOP, - "id=%p: Got I/O event pulse with empty handle ID, ignoring it", - (void *)event_loop); - } - } else { - AWS_LOGF_WARN( - AWS_LS_IO_EVENT_LOOP, - "id=%p: MsgReceived got pulse with unknown code %d, ignoring it", - (void *)event_loop, - pulse.code); - } + s_process_pulse(event_loop, &pulse, &should_process_cross_thread_tasks); } else if (rcvid > 0) { AWS_LOGF_WARN(AWS_LS_IO_EVENT_LOOP, "id=%p: Received message, ignoring it\n", (void *)event_loop); } else { diff --git a/tests/pipe_test.c b/tests/pipe_test.c index 053c5aefd..737af7584 100644 --- a/tests/pipe_test.c +++ b/tests/pipe_test.c @@ -417,6 +417,8 @@ PIPE_TEST_CASE(pipe_read_write_large_buffer, GIANT_BUFFER_SIZE); static void s_on_readable_event(struct aws_pipe_read_end *read_end, int error_code, void *user_data) { + AWS_LOGF_TRACE(12, "=== s_on_readable_event in tests is called"); + struct pipe_state *state = user_data; if (error_code == state->readable_events.error_code_to_monitor) { @@ -430,6 +432,9 @@ static void s_on_readable_event(struct aws_pipe_read_end *read_end, int error_co s_signal_done_on_read_end_closed(state); } } + else { + aws_pipe_read(&state->read_end, &state->buffers.dst, NULL); + } return; error: