59 if (!full_path_.empty()) {
72 const std::vector<std::shared_ptr<StreamNode>> &
getChildren()
const {
82 "Cannot reassign a StreamNode's parent node");
89 if (cached_root_ !=
nullptr) {
96 while (parent !=
nullptr) {
97 cached_root_ = parent;
98 parent = cached_root_->parent_;
105 if (is_initialized_) {
123 getRoot()->addStreamListener_(
this);
125 is_initialized_ =
true;
157 "You cannot reset a StreamNode's stream controller");
158 controller_ = controller;
162 explicit StreamNode(
const std::string & name) :
165 sparta_assert(!name_.empty(),
"You may not create a StreamNode without a name");
181 void addStreamListener_(StreamNode * listener) {
182 std::lock_guard<std::mutex> guard(listeners_mutex_);
183 listeners_.emplace_back(listener);
195 void appendDataValuesForListener_(StreamNode * listener,
196 const std::vector<double> & data)
198 std::lock_guard<std::mutex> guard(listeners_mutex_);
199 listeners_data_[listener].push(data);
207 void releaseDataBufferForListener_(
208 StreamNode * listener,
209 std::queue<std::vector<double>> & data_queue)
211 std::lock_guard<std::mutex> guard(listeners_mutex_);
212 auto iter = listeners_data_.find(listener);
213 if (iter != listeners_data_.end()) {
214 std::swap(data_queue, iter->second);
220 void resolveFullPath_() {
221 std::vector<std::string> path;
222 path.push_back(name_);
224 const StreamNode * parent = parent_;
226 path.emplace_back(parent->name_);
227 parent = parent->parent_;
234 std::reverse(path.begin(), path.end());
235 std::ostringstream oss;
236 for (
const auto & p : path) {
240 full_path_ = oss.str();
241 if (!full_path_.empty() && full_path_.back() ==
'.') {
242 full_path_.pop_back();
249 virtual void initialize_() = 0;
254 virtual const std::vector<double> & readFromStream_() = 0;
257 const std::string name_;
258 std::string full_path_;
260 StreamNode * parent_ =
nullptr;
261 StreamNode * cached_root_ =
nullptr;
263 std::vector<std::shared_ptr<StreamNode>> children_;
264 bool is_initialized_ =
false;
291 std::vector<StreamNode*> listeners_;
295 std::queue<std::vector<double>>> listeners_data_;
297 std::mutex listeners_mutex_;
319 std::shared_ptr<StreamController> controller_;