Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Arrow support #1789

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
[submodule "cpp/vcpkg"]
path = cpp/vcpkg
url = https://github.com/microsoft/vcpkg.git
[submodule "cpp/third_party/sparrow"]
path = cpp/third_party/sparrow
url = https://github.com/man-group/sparrow.git
8 changes: 7 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ else()
find_package(PCRE REQUIRED)
find_package(Libevent REQUIRED)
find_package(semimap REQUIRED)
find_package(sparrow REQUIRED)

find_package(recycle REQUIRED)
find_package(msgpack-c REQUIRED)
Expand Down Expand Up @@ -503,7 +504,12 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp)
util/buffer_holder.cpp
util/native_handler.hpp
arrow/arrow_output_frame.hpp
arrow/arrow_output_frame.cpp
arrow/arrow_utils.hpp
arrow/test/test_arrow.cpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down
22 changes: 22 additions & 0 deletions cpp/arcticdb/arrow/arrow_output_frame.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/


#include <arcticdb/arrow/arrow_output_frame.hpp>

namespace arcticdb {

ArrowOutputFrame::ArrowOutputFrame(const SegmentInMemory &frame, std::shared_ptr <BufferHolder> buffers) :
module_data_(ModuleData::instance()),
frame_(frame),
names_(frame.fields().size() - frame.descriptor().index().field_count()),
index_columns_(frame.descriptor().index().field_count()),
buffers_(std::move(buffers)) {

}

} // namespace arcticdb
30 changes: 30 additions & 0 deletions cpp/arcticdb/arrow/arrow_output_frame.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <arcticdb/pipeline/frame_data_wrapper.hpp>
#include <arcticdb/util/buffer_holder.hpp>

#include <memory>

namespace arcticdb {

class SegmentInMemory;

class ArrowOutputFrame {
ArrowOutputFrame(const SegmentInMemory& frame, std::shared_ptr<BufferHolder> buffers);

private:
std::shared_ptr<ModuleData> module_data_;
SegmentInMemory frame_;
std::vector<std::string> names_;
std::vector<std::string> index_columns_;
std::weak_ptr<pipelines::FrameDataWrapper> arrays_;
std::shared_ptr<BufferHolder> buffers_;
};
}
51 changes: 51 additions & 0 deletions cpp/arcticdb/arrow/arrow_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <sparrow/array/array_data.hpp>

Check failure on line 10 in cpp/arcticdb/arrow/arrow_utils.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

sparrow/array/array_data.hpp: No such file or directory

Check failure on line 10 in cpp/arcticdb/arrow/arrow_utils.hpp

View workflow job for this annotation

GitHub Actions / code_coverage

sparrow/array/array_data.hpp: No such file or directory

Check failure on line 10 in cpp/arcticdb/arrow/arrow_utils.hpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

Cannot open include file: 'sparrow/array/array_data.hpp': No such file or directory
#include <sparrow/array.hpp>
#include <sparrow/arrow_interface/array_data_to_arrow_array_converters.hpp>

#include <arcticdb/column_store/memory_segment.hpp>

namespace arcticdb {

sparrow::arrow_array_unique_ptr arrow_data_from_column(const Column& column) {
return column.type().visit_tag([&](auto && impl) -> sparrow::arrow_array_unique_ptr {
using TagType = std::decay_t<decltype(impl)>;
using DataType = TagType::DataTypeTag;
using RawType = DataType::raw_type;
if constexpr (!is_sequence_type(DataType::data_type)) {
sparrow::array_data data;
data.type = sparrow::data_descriptor(sparrow::arrow_traits<RawType>::type_id);

auto column_data = column.data();
util::check(column_data.num_blocks() == 1, "Expected single block in arrow conversion");
auto block = column_data.next<TagType>().value();
sparrow::buffer<RawType> buffer(const_cast<RawType *>(block.data()), block.row_count());

data.buffers.push_back(buffer);
data.length = static_cast<std::int64_t>(block.row_count());
data.offset = static_cast<std::int64_t>(0);
data.child_data.emplace_back();

return to_arrow_array_unique_ptr(std::move(data));
} else {
util::raise_rte("Sequence types not implemented");
}
});
};
/*
std::vector<sparrow::arrow_array_unique_ptr> segment_to_arrow_arrays(SegmentInMemory& segment) {
std::vector<sparrow::arrow_array_unique_ptr> output;
for(auto& column : segment.column()) {
output.emplace_back(arrow_data_from_column(column));
}
}
*/
} // namespace arcticdb
16 changes: 16 additions & 0 deletions cpp/arcticdb/arrow/test/test_arrow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <gtest/gtest.h>

#include <arcticdb/arrow/arrow_utils.hpp>

TEST(Arrow, ConvertColumn) {
using namespace arcticdb;
Column column;
auto data = arrow_data_from_column(column);
}
8 changes: 6 additions & 2 deletions cpp/arcticdb/stream/aggregator-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ inline void Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy>::commit_i
stats_.reset();
}

inline bool has_something_to_write(const SegmentInMemory& segment) {
return (segment.row_count() > 0 || segment.metadata()) || segment.has_index_descriptor();
}

template<class Index, class Schema, class SegmentingPolicy, class DensityPolicy>
inline void Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy>::commit() {
if (ARCTICDB_LIKELY(segment_.row_count() > 0 || segment_.metadata()) || segment_.has_index_descriptor()) {
if (ARCTICDB_LIKELY(has_something_to_write(segment_))) {
commit_impl(false);
}
}

template<class Index, class Schema, class SegmentingPolicy, class DensityPolicy>
inline void Aggregator<Index, Schema, SegmentingPolicy, DensityPolicy>::finalize() {
if (ARCTICDB_LIKELY(segment_.row_count() > 0 || segment_.metadata()) || segment_.has_index_descriptor()) {
if (ARCTICDB_LIKELY(has_something_to_write(segment_))) {
commit_impl(true);
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/test/test_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ TEST(VersionMap, CompactionUpdateCache) {
version_map->write_version(store, key, std::nullopt);
}

auto assert_keys_in_entry_and_store = [&store, &id](std::shared_ptr<VersionMapEntry> entry, int expected_version_keys, int expected_index_keys, int expected_tombstone_keys){
auto assert_keys_in_entry_and_store = [&store](std::shared_ptr<VersionMapEntry> entry, int expected_version_keys, int expected_index_keys, int expected_tombstone_keys){
int present_version_keys = 0, present_index_keys = 0, present_tombstone_keys = 0;
auto all_entry_keys = entry->keys_;
if (entry->head_) all_entry_keys.push_back(entry->head_.value());
Expand Down
1 change: 1 addition & 0 deletions cpp/third_party/sparrow
Submodule sparrow added at 6b0cff
Loading