Skip to content

Commit

Permalink
wasm abi: impl row_iter_bsatn_advance & row_iter_bsatn_close
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Aug 21, 2024
1 parent 2a9ae03 commit 520b9d0
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 109 deletions.
5 changes: 5 additions & 0 deletions crates/bindings-csharp/Runtime/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class BufferTooSmallException : StdbException
public override string Message => "The provided buffer is not large enough to store the data";
}

public class NoSuchIterException : StdbException
{
public override string Message => "The provided row iterator does not exist";
}

public class NoSuchBytesException : StdbException
{
public override string Message => "The provided bytes source or sink does not exist";
Expand Down
7 changes: 4 additions & 3 deletions crates/bindings-csharp/Runtime/Internal/FFI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum Errno : ushort
HOST_CALL_FAILURE = 1,
NO_SUCH_TABLE = 4,
LOOKUP_NOT_FOUND = 2,
NO_SUCH_ITER = 6,
NO_SUCH_BYTES = 8,
NO_SPACE = 9,
BUFFER_TOO_SMALL = 11,
Expand Down Expand Up @@ -110,7 +111,7 @@ public readonly struct LogLevel(byte log_level)
[StructLayout(LayoutKind.Sequential)]
public readonly record struct RowIter(uint Handle)
{
public static readonly RowIter INVALID = new(uint.MaxValue);
public static readonly RowIter INVALID = new(0);
}

[LibraryImport(StdbNamespace)]
Expand Down Expand Up @@ -161,14 +162,14 @@ out RowIter out_
);

[LibraryImport(StdbNamespace)]
public static partial CheckedStatus _iter_advance(
public static partial short _row_iter_bsatn_advance(
RowIter iter_handle,
[MarshalUsing(CountElementName = nameof(buffer_len))][Out] byte[] buffer,
ref uint buffer_len
);

[LibraryImport(StdbNamespace)]
public static partial void _iter_drop(RowIter iter_handle);
public static partial CheckedStatus _row_iter_bsatn_close(RowIter iter_handle);

[LibraryImport(StdbNamespace)]
public static partial void _console_log(
Expand Down
34 changes: 24 additions & 10 deletions crates/bindings-csharp/Runtime/Internal/ITable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,45 @@ public class Enumerator(FFI.RowIter handle) : IDisposable

public bool MoveNext()
{
if (handle.Equals(FFI.RowIter.INVALID))
{
return false;
}

uint buffer_len;
while (true)
{
buffer_len = (uint)buffer.Length;
try
var ret = FFI._row_iter_bsatn_advance(handle, buffer, ref buffer_len);
if (ret <= 0)
{
FFI._iter_advance(handle, buffer, ref buffer_len);
Current = new byte[buffer_len];
Array.Copy(buffer, 0, Current, 0, buffer_len);
}
catch (BufferTooSmallException)
switch (ret)
{
buffer = new byte[buffer_len];
continue;
// Iterator exhausted, we're done.
case -1:
handle = FFI.RowIter.INVALID;
return buffer_len != 0;
case 0:
return buffer_len != 0;
case (short)(ushort)FFI.Errno.NO_SUCH_ITER:
throw new NoSuchIterException();
case (short)(ushort)FFI.Errno.BUFFER_TOO_SMALL:
buffer = new byte[buffer_len];
continue;
default:
throw new UnknownException((FFI.Errno)(ushort)ret);
}
break;
}
Current = new byte[buffer_len];
Array.Copy(buffer, 0, Current, 0, buffer_len);
return buffer_len != 0;
}

public void Dispose()
{
if (!handle.Equals(FFI.RowIter.INVALID))
{
FFI._iter_drop(handle);
FFI._row_iter_bsatn_close(handle);
handle = FFI.RowIter.INVALID;
// Avoid running ~RowIter if Dispose was executed successfully.
GC.SuppressFinalize(this);
Expand Down
8 changes: 4 additions & 4 deletions crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ IMPORT(Status, _iter_start_filtered,
(TableId table_id, const uint8_t* filter, uint32_t filter_len,
RowIter* iter),
(table_id, filter, filter_len, iter));
IMPORT(Status, _iter_advance,
(RowIter iter, uint8_t* buffer, size_t* buffer_len),
(iter, buffer, buffer_len));
IMPORT(void, _iter_drop, (RowIter iter), (iter));
IMPORT(int16_t, _row_iter_bsatn_advance,
(RowIter iter, uint8_t* buffer_ptr, size_t* buffer_len_ptr),
(iter, buffer_ptr, buffer_len_ptr));
IMPORT(uint16_t, _row_iter_bsatn_close, (RowIter iter), (iter));
IMPORT(int16_t, _bytes_source_read, (BytesSource source, uint8_t* buffer_ptr, size_t* buffer_len_ptr),
(source, buffer_ptr, buffer_len_ptr));
IMPORT(uint16_t, _bytes_sink_write, (BytesSink sink, uint8_t* buffer_ptr, size_t* buffer_len_ptr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
<UnmanagedEntryPointsAssembly Include="SpacetimeDB.Runtime" />
<WasmImport Include="$(SpacetimeNamespace)!_console_log" />
<WasmImport Include="$(SpacetimeNamespace)!_get_table_id" />
<WasmImport Include="$(SpacetimeNamespace)!_create_index" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_by_col_eq" />
<WasmImport Include="$(SpacetimeNamespace)!_insert" />
<WasmImport Include="$(SpacetimeNamespace)!_delete_by_col_eq" />
<WasmImport Include="$(SpacetimeNamespace)!_delete_by_rel" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_start" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_start_filtered" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_next" />
<WasmImport Include="$(SpacetimeNamespace)!_iter_drop" />
<WasmImport Include="$(SpacetimeNamespace)!_row_iter_bsatn_advance" />
<WasmImport Include="$(SpacetimeNamespace)!_row_iter_bsatn_close" />
<WasmImport Include="$(SpacetimeNamespace)!_bytes_source_read" />
<WasmImport Include="$(SpacetimeNamespace)!_bytes_sink_write" />

Expand Down
88 changes: 58 additions & 30 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,42 @@ pub mod raw {
/// - `filter + filter_len` overflows a 64-bit integer
pub fn _iter_start_filtered(table_id: TableId, filter: *const u8, filter_len: usize, out: *mut RowIter) -> u16;

/// Reads rows from the given iterator.
/// Reads rows from the given iterator registered under `iter`.
///
/// Takes rows from the iterator and stores them in the memory pointed to by `buffer`,
/// encoded in BSATN format. `buffer_len` should be a pointer to the capacity of `buffer`,
/// and on success it is set to the combined length of the encoded rows. If it is `0`,
/// the iterator is exhausted and there are no more rows to read.
/// Takes rows from the iterator
/// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`,
/// encoded in BSATN format.
///
/// If no rows can fit in the buffer, `BUFFER_TOO_SMALL` is returned and `buffer_len` is
/// set to the size of the next row in the iterator. The caller should reallocate the
/// buffer to at least that size and try again.
/// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
/// On success (`0` or `-1` is returned),
/// `buffer_len` is set to the combined length of the encoded rows.
/// When `-1` is returned, the iterator has been exhausted
/// and there are no more rows to read,
/// leading to the iterator being immediately destroyed.
/// Note that the host is free to reuse allocations in a pool,
/// destroying the handle logically does not entail that memory is necessarily reclaimed.
///
/// `iter` must be a valid iterator, or the module will trap.
pub fn _iter_advance(iter: RowIter, buffer: *mut u8, buffer_len: *mut usize) -> u16;
/// # Traps
///
/// Traps if:
///
/// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
/// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
///
/// # Errors
///
/// Returns an error:
///
/// - `NO_SUCH_ITER`, when `iter` is not a valid iterator.
/// - `BUFFER_TOO_SMALL`, when there are rows left but they cannot fit in `buffer`.
/// When this occurs, `buffer_len` is set to the size of the next item in the iterator.
/// To make progress, the caller should reallocate the buffer to at least that size and try again.
pub fn _row_iter_bsatn_advance(iter: RowIter, buffer: *mut u8, buffer_len: *mut usize) -> i16;

/// Destroys the iterator.
///
/// `iter` must be a valid iterator, or the module will trap.
pub fn _iter_drop(iter: RowIter);
pub fn _row_iter_bsatn_close(iter: RowIter) -> u16;

/// Log at `level` a `message` message occuring in `filename:line_number`
/// with [`target`] being the module path at the `log!` invocation site.
Expand Down Expand Up @@ -364,6 +382,11 @@ pub mod raw {
#[repr(transparent)]
pub struct RowIter(u32);

impl RowIter {
/// An invalid handle, used e.g., when the iterator has been exhausted.
pub const INVALID: Self = Self(0);
}

#[cfg(any())]
mod module_exports {
type Encoded<T> = Buffer;
Expand Down Expand Up @@ -478,16 +501,10 @@ fn cvt(x: u16) -> Result<(), Errno> {
/// It's not required to write to `out` when `f(out)` returns an error code.
/// - The function `f` never reads a safe and valid `T` from the `out` pointer
/// before writing a safe and valid `T` to it.
/// - If running `Drop` on `T` is required for safety,
/// `f` must never panic nor return an error once `out` has been written to.
#[inline]
unsafe fn call<T>(f: impl FnOnce(*mut T) -> u16) -> Result<T, Errno> {
unsafe fn call<T: Copy>(f: impl FnOnce(*mut T) -> u16) -> Result<T, Errno> {
let mut out = MaybeUninit::uninit();
// TODO: If we have a panic here after writing a safe `T` to `out`,
// we will may have a memory leak if `T` requires running `Drop` for cleanup.
let f_code = f(out.as_mut_ptr());
// TODO: A memory leak may also result due to an error code from `f(out)`
// if `out` has been written to.
cvt(f_code)?;
Ok(out.assume_init())
}
Expand Down Expand Up @@ -673,29 +690,40 @@ pub struct RowIter {
}

impl RowIter {
/// Read some number of bsatn-encoded rows into the provided buffer.
/// Read some number of BSATN-encoded rows into the provided buffer.
///
/// If the iterator is exhausted and did not read anything into buf, 0 is returned. Otherwise,
/// it's the number of new bytes that were added to the end of the buffer.
pub fn read(&self, buf: &mut Vec<u8>) -> usize {
/// If the iterator is exhausted, `None` is returned.
/// Otherwise, the number of new bytes added to the end of the buffer is returned.
pub fn read(&mut self, buf: &mut Vec<u8>) -> Option<usize> {
loop {
let buf_ptr = buf.spare_capacity_mut();
let mut buf_len = buf_ptr.len();
match cvt(unsafe { raw::_iter_advance(self.raw, buf_ptr.as_mut_ptr().cast(), &mut buf_len) }) {
Ok(()) => {
// SAFETY: iter_advance just wrote `buf_len` bytes into the end of `buf`.
unsafe { buf.set_len(buf.len() + buf_len) };
return buf_len;
let ret = unsafe { raw::_row_iter_bsatn_advance(self.raw, buf_ptr.as_mut_ptr().cast(), &mut buf_len) };
if ret <= 0 {
// SAFETY: `_row_iter_bsatn_advance` just wrote `buf_len` bytes into the end of `buf`.
unsafe { buf.set_len(buf.len() + buf_len) };
}

const TOO_SMALL: i16 = errno::BUFFER_TOO_SMALL.get() as i16;
match ret {
-1 => {
self.raw = raw::RowIter::INVALID;
return None;
}
Err(Errno::BUFFER_TOO_SMALL) => buf.reserve(buf_len),
Err(e) => panic!("unexpected error from _iter_advance: {e}"),
0 => return Some(buf_len),
TOO_SMALL => buf.reserve(buf_len),
e => panic!("unexpected error from `_row_iter_bsatn_advance`: {e}"),
}
}
}
}

impl Drop for RowIter {
fn drop(&mut self) {
unsafe { raw::_iter_drop(self.raw) }
// Avoid this syscall when `_row_iter_bsatn_advance` above
// notifies us that the iterator is exhausted.
if self.raw != raw::RowIter::INVALID {
unsafe { raw::_row_iter_bsatn_close(self.raw) }
}
}
}
8 changes: 5 additions & 3 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl<T: TableType> Iterator for TableIter<T> {
// Otherwise, try to fetch the next chunk while reusing the buffer.
self.reader.buf.clear();
self.reader.pos.set(0);
if self.inner.read(&mut self.reader.buf) == 0 {
if self.inner.read(&mut self.reader.buf).unwrap_or_default() == 0 {
return None;
}
}
Expand Down Expand Up @@ -465,10 +465,12 @@ pub mod query {
val: &T,
) -> Option<Table> {
// Find the row with a match.
let iter = iter_by_col_eq(Table::table_id(), COL_IDX.into(), val).unwrap();
let mut iter = iter_by_col_eq(Table::table_id(), COL_IDX.into(), val).unwrap();
with_row_buf(|buf| {
// We will always find either 0 or 1 rows here due to the unique constraint.
iter.read(buf);
let r = iter.read(buf);
debug_assert_eq!(r, None);

if buf.is_empty() {
return None;
}
Expand Down
15 changes: 10 additions & 5 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,17 +501,24 @@ fn with_read_args<R>(args: BytesSource, logic: impl FnOnce(&[u8]) -> R) -> R {
ret
}

const NO_SPACE: u16 = errno::NO_SPACE.get();
const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get();

/// Read `source` from the host fully into `buf`.
fn read_bytes_source_into(source: BytesSource, buf: &mut Vec<u8>) {
const INVALID: i16 = NO_SUCH_BYTES as i16;

loop {
// Write into the spare capacity of the buffer.
let buf_ptr = buf.spare_capacity_mut();
let spare_len = buf_ptr.len();
let mut buf_len = buf_ptr.len();
let buf_ptr = buf_ptr.as_mut_ptr().cast();
let ret = unsafe { sys::raw::_bytes_source_read(source, buf_ptr, &mut buf_len) };
// SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`.
unsafe { buf.set_len(buf.len() + spare_len) };
if ret <= 0 {
// SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`.
unsafe { buf.set_len(buf.len() + spare_len) };
}
match ret {
// Host side source exhausted, we're done.
-1 => break,
Expand All @@ -523,16 +530,14 @@ fn read_bytes_source_into(source: BytesSource, buf: &mut Vec<u8>) {
// The host will likely not trigger this branch (current host doesn't),
// but a module should be prepared for it.
0 => {}
INVALID => panic!("invalid source passed"),
_ => unreachable!(),
}
}
}

/// Write `buf` to `sink`.
fn write_to_sink(sink: BytesSink, mut buf: &[u8]) {
const NO_SPACE: u16 = errno::NO_SPACE.get();
const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get();

loop {
let len = &mut buf.len();
match unsafe { sys::raw::_bytes_sink_write(sink, buf.as_ptr(), len) } {
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ fn from_json_seed<'de, T: serde::de::DeserializeSeed<'de>>(s: &'de str, seed: T)
/// Tags for each call that a `WasmInstanceEnv` can make.
#[derive(Debug, Display, Enum, Clone, Copy, strum::AsRefStr)]
pub enum AbiCall {
RowIterBsatnAdvance,
RowIterBsatnClose,
BytesSourceRead,
BytesSinkWrite,

Expand All @@ -151,8 +153,6 @@ pub enum AbiCall {
GetTableId,
Insert,
IterByColEq,
IterDrop,
IterNext,
IterStart,
IterStartFiltered,
ScheduleReducer,
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ macro_rules! decl_index {
impl ResourceIndex for $name {
type Resource = $resource;
fn from_u32(i: u32) -> Self {
Self(i)
Self(i + 1)
}
fn to_u32(&self) -> u32 {
self.0
self.0 - 1
}
}

Expand Down Expand Up @@ -346,6 +346,8 @@ pub struct AbiRuntimeError {
macro_rules! abi_funcs {
($mac:ident) => {
$mac! {
"spacetime_10.0"::row_iter_bsatn_advance,
"spacetime_10.0"::row_iter_bsatn_close,
"spacetime_10.0"::bytes_source_read,
"spacetime_10.0"::bytes_sink_write,

Expand All @@ -355,8 +357,6 @@ macro_rules! abi_funcs {
"spacetime_10.0"::get_table_id,
"spacetime_10.0"::insert,
"spacetime_10.0"::iter_by_col_eq,
"spacetime_10.0"::iter_drop,
"spacetime_10.0"::iter_advance,
"spacetime_10.0"::iter_start,
"spacetime_10.0"::iter_start_filtered,
"spacetime_10.0"::span_end,
Expand Down
Loading

0 comments on commit 520b9d0

Please sign in to comment.