Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .amazonq/rules/memory-bank/structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The main framework implementation containing the execution engine and core abstr
- Algorithm declarations: `provider_node.cpp/hpp`, `declared_transform.cpp/hpp`, `declared_observer.cpp/hpp`, `declared_fold.cpp/hpp`, `declared_unfold.cpp/hpp`, `declared_predicate.cpp/hpp`, `declared_output.cpp/hpp`
- Graph execution: `framework_graph.cpp/hpp`, `make_computational_edges.cpp/hpp`, `producer_catalog.cpp/hpp`
- Data flow: `message.cpp/hpp`, `message_sender.cpp/hpp`, `consumer.cpp/hpp`
- Product management: `product_query.cpp/hpp`, `products_consumer.cpp/hpp`
- Product management: `product_selector.cpp/hpp`, `products_consumer.cpp/hpp`
- Registration: `registrar.cpp/hpp`, `registration_api.cpp/hpp`, `node_catalog.cpp/hpp`
- Utilities: `filter.cpp/hpp`, `glue.cpp/hpp`, `multiplexer.cpp/hpp`
- `fold/`: Fold operation implementations
Expand Down
2 changes: 0 additions & 2 deletions phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ cet_make_library(
provider_node.cpp
registrar.cpp
registration_api.cpp
store_counters.cpp
LIBRARIES
PUBLIC
TBB::tbb
Expand Down Expand Up @@ -67,7 +66,6 @@ install(
registrar.hpp
registration_api.hpp
source.hpp
store_counters.hpp
upstream_predicates.hpp
DESTINATION include/phlex/core
)
Expand Down
5 changes: 2 additions & 3 deletions phlex/core/concepts.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef PHLEX_CORE_CONCEPTS_HPP
#define PHLEX_CORE_CONCEPTS_HPP

#include "phlex/core/fold/send.hpp"
#include "phlex/core/fwd.hpp"
#include "phlex/metaprogramming/type_deduction.hpp"
#include "phlex/model/fwd.hpp"
Expand All @@ -17,9 +18,7 @@ namespace phlex::experimental {
concept is_bound_object = not std::same_as<T, void_tag>;

template <typename T>
concept sendable = std::move_constructible<T> || requires(T& t) {
{ send(t) } -> std::move_constructible;
};
concept sendable = requires { typename sendable_type<T>; };

template <typename T, std::size_t N>
concept at_least_n_input_parameters = number_parameters<T> >= N;
Expand Down
6 changes: 4 additions & 2 deletions phlex/core/declared_fold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
namespace phlex::experimental {
declared_fold::declared_fold(algorithm_name name,
std::vector<std::string> predicates,
product_selectors input_products) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)}
product_selectors input_products,
std::string partition_layer) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)},
partition_layer_{std::move(partition_layer)}
{
}

Expand Down
170 changes: 59 additions & 111 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
#include "phlex/concurrency.hpp"
#include "phlex/core/concepts.hpp"
#include "phlex/core/fold/send.hpp"
#include "phlex/core/fold_join_node.hpp"
#include "phlex/core/fwd.hpp"
#include "phlex/core/input_arguments.hpp"
#include "phlex/core/message.hpp"
#include "phlex/core/multilayer_join_node.hpp"
#include "phlex/core/product_selector.hpp"
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
#include "phlex/model/handle.hpp"
Expand Down Expand Up @@ -40,26 +39,44 @@ namespace phlex::experimental {
public:
declared_fold(algorithm_name name,
std::vector<std::string> predicates,
product_selectors input_products);
product_selectors input_products,
std::string partition_layer);
~declared_fold() override;

virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual tbb::flow::receiver<index_message>& partition_port() = 0;
virtual std::size_t product_count() const = 0;
identifier const& partition_layer() const { return partition_layer_; }

private:
identifier partition_layer_;
};

using declared_fold_ptr = std::unique_ptr<declared_fold>;
using declared_folds = simple_ptr_map<declared_fold_ptr>;

template <typename FoldResult, typename InitTuple, std::size_t... Is>
auto make_initializer(InitTuple args, std::index_sequence<Is...>)
{
// The compiler emits a warning if an empty args tuple is captured by reference and then not expanded inside the lambda expression. We therefore only capture args in the lambda expression if sizeof...(Is) > 0.
if constexpr (sizeof...(Is) == 0) {
return [](data_cell_index const&) { return std::make_unique<FoldResult>(); };
} else {
return [args](data_cell_index const&) {
return std::make_unique<FoldResult>(std::get<Is>(args)...);
};
}
}

// =====================================================================================

template <typename AlgorithmBits, typename InitTuple>
class fold_node : public declared_fold, private count_stores {
class fold_node : public declared_fold {
using all_parameter_types = typename AlgorithmBits::input_parameter_types;
using result_type = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;
using input_parameter_types = skip_first_type<all_parameter_types>; // Skip fold object
static constexpr auto num_inputs = std::tuple_size_v<input_parameter_types>;
using result_type = std::decay_t<std::tuple_element_t<0, all_parameter_types>>;

static constexpr std::size_t num_outputs = 1; // hard-coded for now
using function_t = typename AlgorithmBits::bound_type;
Expand All @@ -73,138 +90,69 @@ namespace phlex::experimental {
InitTuple initializer,
product_selectors input_products,
std::vector<std::string> output,
std::string partition) :
declared_fold{std::move(algo_name), std::move(predicates), std::move(input_products)},
initializer_{std::move(initializer)},
std::string partition_layer) :
declared_fold{std::move(algo_name),
std::move(predicates),
std::move(input_products),
std::move(partition_layer)},
output_{to_product_specifications(name(), std::move(output), make_type_ids<result_type>())},
partition_{std::move(partition)},
flush_receiver_{g,
tbb::flow::unlimited,
[this](flush_message const& msg) -> tbb::flow::continue_msg {
auto const& [index, counts, original_message_id] = msg;
if (index->layer_name() != partition_) {
return {};
}

counter_for(index->hash()).set_flush_value(counts, original_message_id);
emit_and_evict_if_done(index);
return {};
}},
join_{make_join_or_none<num_inputs>(
g, name().to_string(), layers())}, // FIXME: This should change to include result product!
join_{g,
name().to_string(),
this->partition_layer(),
layers(),
this->output(),
make_initializer<result_type>(
std::move(initializer), std::make_index_sequence<std::tuple_size_v<InitTuple>>{})},
fold_{g,
concurrency,
[this, ft = alg.release_algorithm()](messages_t<num_inputs> const& messages, auto&) {
// N.B. The assumption is that a fold will *never* need to cache
// the product store it creates. Any flush messages *do not* need
// to be propagated to downstream nodes.
auto const& msg = most_derived(messages);
auto const& index = msg.store->index();

auto fold_index = index->parent(partition_);
if (not fold_index) {
return;
}

auto index_hash_for_counter = fold_index->hash();
[this, ft = alg.release_algorithm()](
accumulator_with_messages<result_type, num_inputs> const& accum_with_msgs, auto&) {
std::size_t const partition_hash = apply_fold(ft, accum_with_msgs);

call(ft, messages, std::make_index_sequence<num_inputs>{});
++calls_;

counter_for(index_hash_for_counter).increment(index->layer_hash());

emit_and_evict_if_done(fold_index);
join_.notify_result_repeater_port().try_put(partition_hash);
}}
{
if constexpr (num_inputs > 1ull) {
make_edge(join_, fold_);
}
make_edge(join_, fold_);
}

private:
void emit_and_evict_if_done(data_cell_index_ptr const& fold_index)
{
if (auto counter = done_with(fold_index->hash())) {
auto parent = std::make_shared<product_store>(fold_index, name());
commit(parent);
++product_count_;
tbb::flow::output_port<0>(fold_).try_put(
{.store = parent, .id = counter->original_message_id()});
}
}

tbb::flow::receiver<message>& port_for(product_selector const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), input_product, fold_);
return receiver_for(join_, input(), input_product);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<num_inputs>(join_, fold_);
}
std::vector<tbb::flow::receiver<message>*> ports() override { return input_ports(join_); }

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }
tbb::flow::receiver<index_message>& partition_port() override { return join_.partition_port(); }
tbb::flow::sender<message>& output_port() override { return join_.output_port(); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
std::index_sequence<Is...>)
{
auto const parent_index = most_derived(messages).store->index()->parent(partition_);

// FIXME: Not the safest approach!
auto it = results_.find(parent_index->hash());
if (it == results_.end()) {
it = results_
.insert({parent_index->hash(),
initialized_object(
initializer_, std::make_index_sequence<std::tuple_size_v<InitTuple>>{})})
.first;
}

if constexpr (num_inputs == 1ull) {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(messages)...);
} else {
std::invoke(ft, *it->second, std::get<Is>(input_).retrieve(std::get<Is>(messages))...);
}
}

named_index_ports index_ports() final { return join_.index_ports(); }
std::size_t num_calls() const final { return calls_.load(); }
std::size_t product_count() const final { return product_count_.load(); }

template <size_t... Is>
auto initialized_object(InitTuple const& init, std::index_sequence<Is...>) const
{
return std::unique_ptr<result_type>(new result_type{std::get<Is>(init)...});
}
std::size_t product_count() const final { return join_.emitted_result_count(); }

auto commit(product_store_ptr& store)
std::size_t apply_fold(
function_t const& ft,
accumulator_with_messages<result_type, num_inputs> const& accum_with_msgs)
{
auto& result = results_.at(store->index()->hash());
if constexpr (requires { send(*result); }) {
store->add_product(output()[0], send(*result));
} else {
store->add_product(output()[0], std::move(*result));
}
// Reclaim some memory; it would be better to erase the entire entry from the map,
// but that is not thread-safe.
result.reset();
// We have to do awkward index management until we can use structured bindings with packs.
auto& accumulator = std::get<0>(accum_with_msgs);
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
accumulator.partial_result->call(
ft, std::get<Is>(input_).retrieve(std::get<Is + 1>(accum_with_msgs))...);
}(std::make_index_sequence<num_inputs>{});
return accumulator.index->hash();
}

InitTuple initializer_;
input_retriever_types<input_parameter_types> input_{input_arguments<input_parameter_types>()};
product_specifications output_;
identifier partition_;
tbb::flow::function_node<flush_message> flush_receiver_;
join_or_none_t<num_inputs> join_;
tbb::flow::multifunction_node<messages_t<num_inputs>, message_tuple<1>> fold_;
tbb::concurrent_unordered_map<data_cell_index::hash_type, std::unique_ptr<result_type>>
results_;
fold_join_node<result_type, num_inputs> join_;
tbb::flow::multifunction_node<accumulator_with_messages<result_type, num_inputs>,
message_tuple<1>>
fold_;
std::atomic<std::size_t> calls_;
std::atomic<std::size_t> product_count_;
};
}

Expand Down
1 change: 0 additions & 1 deletion phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include "phlex/core/multilayer_join_node.hpp"
#include "phlex/core/product_selector.hpp"
#include "phlex/core/products_consumer.hpp"
#include "phlex/core/store_counters.hpp"
#include "phlex/metaprogramming/type_deduction.hpp"
#include "phlex/model/algorithm_name.hpp"
#include "phlex/model/data_cell_index.hpp"
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace phlex::experimental {
product_store_const_ptr generator::make_child(std::size_t const i, products new_products)
{
auto child_index = parent_->index()->make_child(child_layer_name_, i);
++child_counts_;
++child_count_;
return std::make_shared<product_store>(child_index, node_name_, std::move(new_products));
}

Expand Down
4 changes: 2 additions & 2 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace phlex::experimental {
std::string const& child_layer_name);

std::size_t child_layer_hash() const { return child_layer_hash_; }
std::size_t child_count() const { return child_counts_; }
std::size_t child_count() const { return child_count_; }
product_store_const_ptr make_child(std::size_t i, products new_products);

private:
Expand All @@ -52,7 +52,7 @@ namespace phlex::experimental {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
std::string const& child_layer_name_;
std::size_t child_layer_hash_;
std::size_t child_counts_ = 0;
std::size_t child_count_ = 0;
};

class PHLEX_CORE_EXPORT declared_unfold : public products_consumer {
Expand Down
Loading
Loading