Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Refactor 1833:use entt as ecs #1834

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
029e405
WIP introduce entt
alexowens90 Sep 9, 2024
1494b30
All processing pipeline tests passing with data stored in entt
alexowens90 Sep 9, 2024
529dad1
REVERT ME: Only run relevant C++ unit tests
alexowens90 Sep 9, 2024
de6367d
Get rid of ComponentMaps
alexowens90 Sep 9, 2024
e6e6dc3
Eliminate id mapping
alexowens90 Sep 9, 2024
41a40d0
Pull out some hard-coded constants
alexowens90 Sep 9, 2024
371cac3
Add arbitrary components to an entity when reading from storage
alexowens90 Sep 11, 2024
170a8f4
All methods to add entties/components now fully type-generic
alexowens90 Sep 11, 2024
4faacc0
Make get_initial_expected_get_calls take a vector of entity ids like …
alexowens90 Sep 11, 2024
51b0ac4
Further templating
alexowens90 Sep 13, 2024
4f39189
Renamed segment_initial_expected_get_calls
alexowens90 Sep 13, 2024
75f59f0
Ensure entities are deleted when no longer needed
alexowens90 Sep 13, 2024
cb1457e
Fixed C++ tests
alexowens90 Sep 13, 2024
037d8bb
Remove component_manager.cpp
alexowens90 Sep 13, 2024
403eba2
Fix benchmark_clause.cpp
alexowens90 Sep 13, 2024
7067360
Convert debug::check to internal::chec, rename some old references to…
alexowens90 Sep 13, 2024
2406570
REVERT ME: instrumented for time interacting with ComponentManager
Sep 16, 2024
1b95096
Only take ComponentManager mutex once in each call to push_entities
Sep 16, 2024
889461b
Pull as much work out of get_entities as possible
Sep 16, 2024
6590249
More instrumentation, only destroy SegmentInMemory component, use con…
Sep 17, 2024
ee15555
Use view to get entities
Sep 17, 2024
6c76c7a
TODOs
Sep 17, 2024
5640301
Use const view
Sep 17, 2024
ac001aa
Remove commented out destroy
Sep 17, 2024
3165ee6
Reset shared ptr instead of erasing component
Sep 18, 2024
2fac258
Added TODO
Sep 18, 2024
6891e1a
Make gathering entities lock-free
Sep 18, 2024
22c14a6
Remove empty check
Sep 18, 2024
75ec37d
Use uint64_t for entity ids
Sep 19, 2024
ccae95d
Revert "Make gathering entities lock-free"
Sep 19, 2024
83321f7
Pin to stable release of entt, add_subdirectory in conda builds too
Sep 19, 2024
384f9ad
Remove unnecessary code
Sep 19, 2024
bf42631
Use version of registry_.create that takes 2 iterators
Sep 19, 2024
c82b23c
Remove most profiling timings
Sep 19, 2024
95e80db
Added comments
Sep 20, 2024
de6a7a6
Fix some C++ tests since ComponentManager API change
Sep 20, 2024
56c1883
Fix remaining tests
Sep 20, 2024
7cda181
Fix Windows segfaults
alexowens90 Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@
[submodule "cpp/vcpkg"]
path = cpp/vcpkg
url = https://github.com/microsoft/vcpkg.git
[submodule "cpp/third_party/entt"]
path = cpp/third_party/entt
url = https://github.com/skypjack/entt.git
56 changes: 29 additions & 27 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ if (WIN32)
)
endif()

add_compile_definitions(ENTT_ID_TYPE=std::uint64_t)

set (arcticdb_core_libraries
pybind11::module # Transitively includes Python::Module or Python::Python as appropriate
Expand All @@ -656,6 +657,7 @@ set (arcticdb_core_libraries
Folly::folly # Transitively includes: double-conversion, gflags, glog, libevent, libssl, libcrypto, libiberty, libsodium
Azure::azure-identity
Azure::azure-storage-blobs
EnTT::EnTT
)

if(${ARCTICDB_PYTHON_EXPLICIT_LINK})
Expand Down Expand Up @@ -873,33 +875,33 @@ if(${TEST})
python_utils_dump_vars_if_enabled("Python for test compilation")

set(unit_test_srcs
async/test/test_async.cpp
codec/test/test_codec.cpp
codec/test/test_encode_field_collection.cpp
codec/test/test_segment_header.cpp
codec/test/test_encoded_field.cpp
column_store/test/ingestion_stress_test.cpp
column_store/test/test_column.cpp
column_store/test/test_column_data_random_accessor.cpp
column_store/test/test_index_filtering.cpp
column_store/test/test_memory_segment.cpp
entity/test/test_atom_key.cpp
entity/test/test_key_serialization.cpp
entity/test/test_metrics.cpp
entity/test/test_ref_key.cpp
entity/test/test_tensor.cpp
log/test/test_log.cpp
pipeline/test/test_container.hpp
pipeline/test/test_pipeline.cpp
pipeline/test/test_query.cpp
util/test/test_regex.cpp
processing/test/test_arithmetic_type_promotion.cpp
processing/test/test_clause.cpp
processing/test/test_component_manager.cpp
processing/test/test_expression.cpp
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_has_valid_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
# async/test/test_async.cpp
# codec/test/test_codec.cpp
# codec/test/test_encode_field_collection.cpp
# codec/test/test_segment_header.cpp
# codec/test/test_encoded_field.cpp
# column_store/test/ingestion_stress_test.cpp
# column_store/test/test_column.cpp
# column_store/test/test_column_data_random_accessor.cpp
# column_store/test/test_index_filtering.cpp
# column_store/test/test_memory_segment.cpp
# entity/test/test_atom_key.cpp
# entity/test/test_key_serialization.cpp
# entity/test/test_metrics.cpp
# entity/test/test_ref_key.cpp
# entity/test/test_tensor.cpp
# log/test/test_log.cpp
# pipeline/test/test_container.hpp
# pipeline/test/test_pipeline.cpp
# pipeline/test/test_query.cpp
# util/test/test_regex.cpp
# processing/test/test_arithmetic_type_promotion.cpp
# processing/test/test_clause.cpp
# processing/test/test_component_manager.cpp
# processing/test/test_expression.cpp
# processing/test/test_filter_and_project_sparse.cpp
# processing/test/test_has_valid_type_promotion.cpp
# processing/test/test_operation_dispatch.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
Expand Down
118 changes: 43 additions & 75 deletions cpp/arcticdb/processing/clause.cpp

Large diffs are not rendered by default.

40 changes: 32 additions & 8 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,35 @@ std::vector<std::vector<size_t>> structure_by_column_slice(std::vector<RangesAnd

std::vector<std::vector<size_t>> structure_all_together(std::vector<RangesAndKey>& ranges_and_keys);

ProcessingUnit gather_entities(std::shared_ptr<ComponentManager> component_manager,
std::vector<EntityId>&& entity_ids,
bool include_atom_keys = false,
bool include_initial_expected_get_calls = false);
/*
* On entry to a clause, construct ProcessingUnits from the input entity IDs. These will either be provided by the
* structure_for_processing method for the first clause in the pipeline, or by the previous clause for all subsequent
* clauses.
*/
template<class... Args>
ProcessingUnit gather_entities(ComponentManager& component_manager, std::vector<EntityId>&& entity_ids) {
ProcessingUnit res;
auto components = component_manager.get_entities<Args...>(entity_ids);
([&]{
auto component = std::move(std::get<std::vector<Args>>(components));
if constexpr (std::is_same_v<Args, std::shared_ptr<SegmentInMemory>>) {
res.set_segments(std::move(component));
} else if constexpr (std::is_same_v<Args, std::shared_ptr<RowRange>>) {
res.set_row_ranges(std::move(component));
} else if constexpr (std::is_same_v<Args, std::shared_ptr<ColRange>>) {
res.set_col_ranges(std::move(component));
} else if constexpr (std::is_same_v<Args, std::shared_ptr<AtomKey>>) {
res.set_atom_keys(std::move(component));
} else if constexpr (std::is_same_v<Args, EntityFetchCount>) {
res.set_entity_fetch_count(std::move(component));
} else {
static_assert(sizeof(Args) == 0, "Unexpected component type provided in gather_entities");
}
}(), ...);
return res;
}

std::vector<EntityId> push_entities(std::shared_ptr<ComponentManager> component_manager, ProcessingUnit&& proc);
std::vector<EntityId> push_entities(ComponentManager& component_manager, ProcessingUnit&& proc, EntityFetchCount entity_fetch_count=1);

std::vector<EntityId> flatten_entities(std::vector<std::vector<EntityId>>&& entity_ids_vec);

Expand Down Expand Up @@ -272,14 +295,15 @@ struct PartitionClause {
if (entity_ids.empty()) {
return {};
}
auto proc = gather_entities(component_manager_, std::move(entity_ids));
auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager_, std::move(entity_ids));
std::vector<ProcessingUnit> partitioned_procs = partition_processing_segment<GrouperType, BucketizerType>(
proc,
ColumnName(grouping_column_),
processing_config_.dynamic_schema_);
std::vector<EntityId> output;
for (auto &&partitioned_proc: partitioned_procs) {
std::vector<EntityId> proc_entity_ids = push_entities(component_manager_, std::move(partitioned_proc));
// EntityFetchCount is 2, once for the repartition, and once for AggregationClause::process
std::vector<EntityId> proc_entity_ids = push_entities(*component_manager_, std::move(partitioned_proc), 2);
output.insert(output.end(), proc_entity_ids.begin(), proc_entity_ids.end());
}
return output;
Expand All @@ -305,7 +329,7 @@ struct PartitionClause {
// Experimentation shows flattening the entities into a single vector and a single call to
// component_manager_->get is faster than not flattening and making multiple calls
auto entity_ids = flatten_entities(std::move(entity_ids_vec));
std::vector<bucket_id> buckets{component_manager_->get<bucket_id>(entity_ids)};
auto [buckets] = component_manager_->get_entities<bucket_id>(entity_ids);
for (auto [idx, entity_id]: folly::enumerate(entity_ids)) {
res[buckets[idx]].emplace_back(entity_id);
}
Expand Down
20 changes: 15 additions & 5 deletions cpp/arcticdb/processing/component_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,27 @@
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/


#include <atomic>

#include <arcticdb/processing/component_manager.hpp>

namespace arcticdb {

void ComponentManager::set_next_entity_id(EntityId id) {
next_entity_id_ = id;
std::vector<EntityId> ComponentManager::get_new_entity_ids(size_t count) {
std::vector<EntityId> ids(count);
std::lock_guard<std::mutex> lock(mtx_);
registry_.create(ids.begin(), ids.end());
return ids;
}

EntityId ComponentManager::entity_id(const std::optional<EntityId>& id) {
// Do not use value_or as we do not want the side effect of fetch_add when id was provided by the caller
return id.has_value() ? *id : next_entity_id_.fetch_add(1);
void ComponentManager::erase_entity(EntityId id) {
// Ideally would call registry_.destroy(id), or at least registry_.erase<std::shared_ptr<SegmentInMemory>>(id)
// at this point. However, they are both slower than this, so just decrement the ref count of the only
// sizeable component, so that when the shared pointer goes out of scope in the calling function, the
// memory is freed
registry_.get<std::shared_ptr<SegmentInMemory>>(id).reset();
}


} // namespace arcticdb
Loading
Loading